Compare commits

..

49 Commits

Author SHA1 Message Date
Paul Masurel
0928597a43 Bumping version 2022-10-20 10:32:14 +09:00
Paul Masurel
f72abe9b9c Bugfix position broken.
For Field with several FieldValues, with a
value that contained no token at all, the token position
was reinitialized to 0.

As a result, PhraseQueries can show some false positives.
In addition, after the computation of the position delta, we can
underflow u32, and end up with gigantic delta.

We haven't been able to actually explain the bug in 1629, but it
is assumed that in some corner case these delta can cause a panic.

Closes #1629
2022-10-20 10:27:57 +09:00
Paul Masurel
f0a2b1cc44 Bumped tantivy and subcrate versions. 2022-05-25 22:50:33 +09:00
Paul Masurel
fcfdc44c61 Bumped tantivy-grammar version 2022-05-25 21:52:46 +09:00
Paul Masurel
3171f0b9ba Added ZSTD support in CHANGELOG 2022-05-25 21:51:46 +09:00
PSeitz
89e19f14b5 Merge pull request #1374 from kryesh/main
Add Zstd compression support, Make block size configurable via IndexSettings
2022-05-25 07:39:46 +02:00
PSeitz
1a6a1396cd Merge pull request #1376 from saroh/json-example
Add examples to explain default field handling in the json example
2022-05-24 07:09:37 +02:00
saroh
e766375700 remove useless example 2022-05-23 19:49:31 +02:00
PSeitz
496b4a4fdb Update examples/json_field.rs 2022-05-23 12:24:36 +02:00
PSeitz
93cc8498b3 Update examples/json_field.rs 2022-05-23 11:59:42 +02:00
PSeitz
0aa3d63a9f Update examples/json_field.rs 2022-05-23 11:39:45 +02:00
PSeitz
4e2a053b69 Update examples/json_field.rs 2022-05-23 11:27:05 +02:00
Paul Masurel
71c4393ec4 Clippy 2022-05-23 10:20:37 +09:00
saroh
b2e97e266a more examples to explain default field handling 2022-05-21 17:36:39 +02:00
Antoine G
9ee4772140 Fix deps for unicode regex compiling (#1373)
* lint doc warning

* fix regex build
2022-05-20 10:18:44 +09:00
Kryesh
c95013b11e Add zstd-compression feature to github workflow tests 2022-05-19 22:15:18 +10:00
Kryesh
fc045e6bf9 Cleanup imports, remove unneeded error mapping 2022-05-19 10:34:02 +10:00
Kryesh
6837a4d468 Fix bench 2022-05-18 20:35:29 +10:00
Kryesh
0759bf9448 Cleanup zstd structure and serialise to u32 in line with lz4 2022-05-18 20:31:22 +10:00
Kryesh
152e8238d7 Fix silly errors from running tests without feature flag 2022-05-18 19:49:10 +10:00
Kryesh
d4e5b48437 Apply feedback - standardise on u64 and fix correct compression bounds 2022-05-18 19:37:28 +10:00
Kryesh
03040ed81d Add Zstd compression support 2022-05-18 14:04:43 +10:00
Kryesh
aaa22ad225 Make block size configurable to allow for better compression ratios on large documents 2022-05-18 11:13:15 +10:00
Antoine G
3223bdf254 Refactorize PhraseScorer::compute_phrase_match (#1364)
* Refactorize PhraseScorer::compute_phrase_match
* implem optim for slop
2022-05-13 09:57:21 +09:00
dependabot[bot]
cbd06ab189 Update pprof requirement from 0.8.0 to 0.9.0 (#1365)
Updates the requirements on [pprof](https://github.com/tikv/pprof-rs) to permit the latest version.
- [Release notes](https://github.com/tikv/pprof-rs/releases)
- [Changelog](https://github.com/tikv/pprof-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/tikv/pprof-rs/commits)

---
updated-dependencies:
- dependency-name: pprof
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2022-05-11 11:42:04 +09:00
Paul Masurel
749395bbb8 Added rustdoc for MultiFruit extract function (#1369) 2022-05-11 11:41:39 +09:00
Paul Masurel
617ba1f0c0 Bugfix in the document deserialization. (#1368)
Deserializing a json field does not expect the
end of the document anymore.

This behavior is well documented in serde_json.
https://docs.serde.rs/serde_json/fn.from_reader.html

Closes #1366
2022-05-11 11:38:10 +09:00
Paul Masurel
2f1cd7e7f0 Bugfix in the document deserialization. (#1367)
Deserializing a json field does not expect the
end of the document anymore.

This behavior is well documented in serde_json.
https://docs.serde.rs/serde_json/fn.from_reader.html

Closes #1366
2022-05-11 11:27:04 +09:00
PSeitz
58c0cb5fc4 Merge pull request #1357 from saroh/1302-json-term-writer-API
Expose helpers to generate json field writer terms
2022-05-10 11:02:05 +08:00
PSeitz
7f45a6ac96 allow setting tokenizer manager on index (#1362)
handle json in tokenizer_for_field
2022-05-09 18:15:45 +09:00
saroh
0ade871126 rename constructor to be more explicit 2022-05-06 13:29:07 +02:00
PSeitz
aab65490c9 Merge pull request #1358 from quickwit-oss/fix_docs
add alias shard_size to split_size for quickwit
2022-05-06 18:41:34 +08:00
Pascal Seitz
d77e8de36a flip alias variable name 2022-05-06 17:52:36 +08:00
Pascal Seitz
d11a8cce26 minor docs fix 2022-05-06 17:52:36 +08:00
Pascal Seitz
bc607a921b add alias shard_size split_size for quickwit
improve some docs
2022-05-06 17:52:36 +08:00
Paul Masurel
1273f33338 Fixed comment. 2022-05-06 18:35:25 +09:00
Paul Masurel
e30449743c Shortens blocks' last_key in the SSTable block index. (#1361)
Right now we store last key in the blocks of the SSTable index.
This PR replaces the last key by a shorter string that is greater or
equal and still lesser than the next key.
This property is sufficiently to ensure the block index
works properly.

Related to quickwit#1366
2022-05-06 16:29:06 +08:00
Paul Masurel
ed26552296 Minor changes in query parsing for quickwit#1334. (#1356)
Quickwit's still heavily relies on generating field names
containing a '.' for nested object, yet allows for
user defined field names to contain a dot.

In order to reuse tantivy query parser, we will end up
using quickwit field names directly into tantivy.
Only '.' will be escaped.

This PR makes minor changes in how tantivy query parser parses
a field name and resolves it to a field.
Some of the new edge case behavior is hacky.

Closes #1355
2022-05-06 13:20:10 +09:00
Saroh
65d129afbd better function names 2022-05-05 10:12:28 +02:00
Antoine G
386ffab76c Fix documentation regression (#1359)
This breaks the doc on doc.rs as the type seems to shadow the struct https://docs.rs/tantivy/latest/tantivy/termdict/type.TermDictionary.html
introduced by #1293 which may not have been up to date with what was done in #1242
2022-05-05 14:59:25 +09:00
Pasha Podolsky
57a8d0359c Make FruitHandle and MultiFruit public (#1360)
* Make `FruitHandle` and `MultiFruit` public

* Add docs for `MultiFruit` and `FruitHandle`
2022-05-05 14:58:33 +09:00
Saroh
14cb66ee00 move helper to indexer module 2022-05-04 18:01:57 +02:00
Saroh
9e38343352 expose helpers for json field writer manipulation
closes #1302
2022-05-04 18:01:45 +02:00
PSeitz
944302ae2f Merge pull request #1350 from quickwit-oss/update_edition
update edition
2022-05-04 11:02:52 +02:00
Paul Masurel
be70804d17 Removed AtomicUsize. 2022-05-04 16:45:24 +09:00
PSeitz
a1afc80600 Update src/core/executor.rs
Co-authored-by: Paul Masurel <paul@quickwit.io>
2022-05-04 08:39:44 +02:00
Paul Masurel
02e24fda52 Clippy fix 2022-05-04 12:24:07 +09:00
Pascal Seitz
4db655ae82 update dependencies, update edition 2022-04-28 22:50:55 +08:00
Pascal Seitz
bb44cc84c4 update dependencies 2022-04-28 20:55:36 +08:00
55 changed files with 1175 additions and 531 deletions

View File

@@ -33,7 +33,7 @@ jobs:
components: rustfmt, clippy
- name: Run tests
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace
- name: Run tests quickwit feature
run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace

View File

@@ -1,4 +1,8 @@
Unreleased
Tantivy 0.18.1
================================
- Hotfix: positions computation. #1629 (@fmassot, @fulmicoton, @PSeitz)
Tantivy 0.18
================================
- For date values `chrono` has been replaced with `time` (@uklotzde) #1304 :
- The `time` crate is re-exported as `tantivy::time` instead of `tantivy::chrono`.
@@ -11,6 +15,7 @@ Unreleased
- Add [histogram](https://github.com/quickwit-oss/tantivy/pull/1306) aggregation (@PSeitz)
- Add support for fastfield on text fields (@PSeitz)
- Add terms aggregation (@PSeitz)
- Add support for zstd compression (@kryesh)
Tantivy 0.17
================================

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.17.0"
version = "0.18.1"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -10,71 +10,72 @@ homepage = "https://github.com/quickwit-oss/tantivy"
repository = "https://github.com/quickwit-oss/tantivy"
readme = "README.md"
keywords = ["search", "information", "retrieval"]
edition = "2018"
edition = "2021"
[dependencies]
oneshot = "0.1"
base64 = "0.13"
oneshot = "0.1.3"
base64 = "0.13.0"
byteorder = "1.4.3"
crc32fast = "1.2.1"
once_cell = "1.7.2"
regex ={ version = "1.5.4", default-features = false, features = ["std"] }
tantivy-fst = "0.3"
memmap2 = {version = "0.5", optional=true}
lz4_flex = { version = "0.9", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3", optional = true }
crc32fast = "1.3.2"
once_cell = "1.10.0"
regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] }
tantivy-fst = "0.3.0"
memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.11", optional = true }
snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.2", optional = true }
log = "0.4.14"
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
num_cpus = "1.13"
tempfile = { version = "3.3.0", optional = true }
log = "0.4.16"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
num_cpus = "1.13.1"
fs2={ version = "0.4.3", optional = true }
levenshtein_automata = "0.2"
levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam = "0.8.1"
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
common = { version = "0.2", path = "./common/", package = "tantivy-common" }
fastfield_codecs = { version="0.1", path="./fastfield_codecs", default-features = false }
ownedbytes = { version="0.2", path="./ownedbytes" }
stable_deref_trait = "1.2"
rust-stemmers = "1.2"
downcast-rs = "1.2"
crossbeam-channel = "0.5.4"
tantivy-query-grammar = { version="0.18.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.2", path="./bitpacker" }
common = { version = "0.3", path = "./common/", package = "tantivy-common" }
fastfield_codecs = { version="0.2", path="./fastfield_codecs", default-features = false }
ownedbytes = { version="0.3", path="./ownedbytes" }
stable_deref_trait = "1.2.0"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.0"
bitpacking = { version = "0.8.4", default-features = false, features = ["bitpacker4x"] }
census = "0.4"
census = "0.4.0"
fnv = "1.0.7"
thiserror = "1.0.24"
thiserror = "1.0.30"
htmlescape = "0.3.1"
fail = "0.5"
murmurhash32 = "0.2"
time = { version = "0.3.7", features = ["serde-well-known"] }
smallvec = "1.6.1"
rayon = "1.5"
lru = "0.7.0"
fastdivide = "0.4"
itertools = "0.10.0"
measure_time = "0.8.0"
pretty_assertions = "1.1.0"
serde_cbor = {version="0.11", optional=true}
async-trait = "0.1"
fail = "0.5.0"
murmurhash32 = "0.2.0"
time = { version = "0.3.9", features = ["serde-well-known"] }
smallvec = "1.8.0"
rayon = "1.5.2"
lru = "0.7.5"
fastdivide = "0.4.0"
itertools = "0.10.3"
measure_time = "0.8.2"
pretty_assertions = "1.2.1"
serde_cbor = { version = "0.11.2", optional = true }
async-trait = "0.1.53"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"
[dev-dependencies]
rand = "0.8.3"
rand = "0.8.5"
maplit = "1.0.2"
matches = "0.1.8"
proptest = "1.0"
matches = "0.1.9"
proptest = "1.0.0"
criterion = "0.3.5"
test-log = "0.2.8"
test-log = "0.2.10"
env_logger = "0.9.0"
pprof = {version= "0.8", features=["flamegraph", "criterion"]}
futures = "0.3.15"
pprof = { version = "0.9.0", features = ["flamegraph", "criterion"] }
futures = "0.3.21"
[dev-dependencies.fail]
version = "0.5"
version = "0.5.0"
features = ["failpoints"]
[profile.release]
@@ -93,6 +94,7 @@ mmap = ["fs2", "tempfile", "memmap2"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4_flex"]
snappy-compression = ["snap"]
zstd-compression = ["zstd"]
failpoints = ["fail/failpoints"]
unstable = [] # useful for benches.

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-bitpacker"
version = "0.1.1"
version = "0.2.0"
edition = "2018"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-common"
version = "0.2.0"
version = "0.3.0"
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2018"
@@ -10,7 +10,7 @@ description = "common traits and utility functions used by multiple tantivy subc
[dependencies]
byteorder = "1.4.3"
ownedbytes = { version="0.2", path="../ownedbytes" }
ownedbytes = { version="0.3", path="../ownedbytes" }
[dev-dependencies]
proptest = "1.0.0"

View File

@@ -1,7 +1,8 @@
// # Json field example
//
// This example shows how the json field can be used
// to make tantivy partially schemaless.
// to make tantivy partially schemaless by setting it as
// default query parser field.
use tantivy::collector::{Count, TopDocs};
use tantivy::query::QueryParser;
@@ -10,10 +11,6 @@ use tantivy::Index;
fn main() -> tantivy::Result<()> {
// # Defining the schema
//
// We need two fields:
// - a timestamp
// - a json object field
let mut schema_builder = Schema::builder();
schema_builder.add_date_field("timestamp", FAST | STORED);
let event_type = schema_builder.add_text_field("event_type", STRING | STORED);
@@ -43,7 +40,8 @@ fn main() -> tantivy::Result<()> {
"attributes": {
"target": "submit-button",
"cart": {"product_id": 133},
"description": "das keyboard"
"description": "das keyboard",
"event_type": "holiday-sale"
}
}"#,
)?;
@@ -53,6 +51,9 @@ fn main() -> tantivy::Result<()> {
let reader = index.reader()?;
let searcher = reader.searcher();
// # Default fields: event_type and attributes
// By setting attributes as a default field it allows omitting attributes itself, e.g. "target",
// instead of "attributes.target"
let query_parser = QueryParser::for_index(&index, vec![event_type, attributes]);
{
let query = query_parser.parse_query("target:submit-button")?;
@@ -70,10 +71,34 @@ fn main() -> tantivy::Result<()> {
assert_eq!(count_docs, 1);
}
{
let query = query_parser
.parse_query("event_type:click AND cart.product_id:133")
.unwrap();
let hits = searcher.search(&*query, &TopDocs::with_limit(2)).unwrap();
let query = query_parser.parse_query("click AND cart.product_id:133")?;
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
assert_eq!(hits.len(), 1);
}
{
// The sub-fields in the json field marked as default field still need to be explicitly
// addressed
let query = query_parser.parse_query("click AND 133")?;
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
assert_eq!(hits.len(), 0);
}
{
// Default json fields are ignored if they collide with the schema
let query = query_parser.parse_query("event_type:holiday-sale")?;
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
assert_eq!(hits.len(), 0);
}
// # Query via full attribute path
{
// This only searches in our schema's `event_type` field
let query = query_parser.parse_query("event_type:click")?;
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
assert_eq!(hits.len(), 2);
}
{
// Default json fields can still be accessed by full path
let query = query_parser.parse_query("attributes.event_type:holiday-sale")?;
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
assert_eq!(hits.len(), 1);
}
Ok(())

View File

@@ -1,6 +1,6 @@
[package]
name = "fastfield_codecs"
version = "0.1.0"
version = "0.2.0"
authors = ["Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2018"
@@ -9,8 +9,8 @@ description = "Fast field codecs used by tantivy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common = { version = "0.2", path = "../common/", package = "tantivy-common" }
tantivy-bitpacker = { version="0.1.1", path = "../bitpacker/" }
common = { version = "0.3", path = "../common/", package = "tantivy-common" }
tantivy-bitpacker = { version="0.2", path = "../bitpacker/" }
prettytable-rs = {version="0.8.0", optional= true}
rand = {version="0.8.3", optional= true}

View File

@@ -1,7 +1,7 @@
[package]
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
name = "ownedbytes"
version = "0.2.0"
version = "0.3.0"
edition = "2018"
description = "Expose data as static slice"
license = "MIT"

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-query-grammar"
version = "0.15.0"
version = "0.18.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]

View File

@@ -18,7 +18,7 @@ use crate::Occur;
const SPECIAL_CHARS: &[char] = &[
'+', '^', '`', ':', '{', '}', '"', '[', ']', '(', ')', '~', '!', '\\', '*', ' ',
];
const ESCAPED_SPECIAL_CHARS_PATTERN: &str = r#"\\(\+|\^|`|:|\{|\}|"|\[|\]|\(|\)|\~|!|\\|\*| )"#;
const ESCAPED_SPECIAL_CHARS_PATTERN: &str = r#"\\(\+|\^|`|:|\{|\}|"|\[|\]|\(|\)|\~|!|\\|\*|\s)"#;
/// Parses a field_name
/// A field name must have at least one character and be followed by a colon.
@@ -34,7 +34,8 @@ fn field_name<'a>() -> impl Parser<&'a str, Output = String> {
take_while(|c| !SPECIAL_CHARS.contains(&c)),
),
'\\',
satisfy(|c| SPECIAL_CHARS.contains(&c)),
satisfy(|_| true), /* if the next character is not a special char, the \ will be treated
* as the \ character. */
))
.skip(char(':'))
.map(|s| ESCAPED_SPECIAL_CHARS_RE.replace_all(&s, "$1").to_string())
@@ -516,15 +517,27 @@ mod test {
}
#[test]
fn test_field_name() -> TestParseResult {
fn test_field_name() {
assert_eq!(
super::field_name().parse(".my.field.name:a"),
Ok((".my.field.name".to_string(), "a"))
);
assert_eq!(
super::field_name().parse(r#"my\ field:a"#),
Ok(("my field".to_string(), "a"))
);
assert_eq!(
super::field_name().parse(r#"にんじん:a"#),
Ok(("にんじん".to_string(), "a"))
);
assert_eq!(
super::field_name().parse("my\\ field\\ name:a"),
Ok(("my field name".to_string(), "a"))
);
assert_eq!(
super::field_name().parse(r#"my\field:a"#),
Ok((r#"my\field"#.to_string(), "a"))
);
assert!(super::field_name().parse("my field:a").is_err());
assert_eq!(
super::field_name().parse("\\(1\\+1\\):2"),
@@ -534,14 +547,21 @@ mod test {
super::field_name().parse("my_field_name:a"),
Ok(("my_field_name".to_string(), "a"))
);
assert_eq!(
super::field_name().parse("myfield.b:hello").unwrap(),
("myfield.b".to_string(), "hello")
);
assert_eq!(
super::field_name().parse(r#"myfield\.b:hello"#).unwrap(),
(r#"myfield\.b"#.to_string(), "hello")
);
assert!(super::field_name().parse("my_field_name").is_err());
assert!(super::field_name().parse(":a").is_err());
assert!(super::field_name().parse("-my_field:a").is_err());
assert_eq!(
super::field_name().parse("_my_field:a")?,
("_my_field".to_string(), "a")
super::field_name().parse("_my_field:a"),
Ok(("_my_field".to_string(), "a"))
);
Ok(())
}
#[test]

View File

@@ -230,8 +230,7 @@ pub enum BucketResult {
impl BucketResult {
pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result<Self> {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
Ok(BucketResult::from_intermediate_and_req(empty_bucket, req)?)
BucketResult::from_intermediate_and_req(empty_bucket, req)
}
fn from_intermediate_and_req(

View File

@@ -1364,4 +1364,29 @@ mod tests {
Ok(())
}
#[test]
fn histogram_invalid_request() -> crate::Result<()> {
let index = get_test_index_2_segments(true)?;
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 0.0,
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let agg_res = exec_request(agg_req, &index);
assert!(agg_res.is_err());
Ok(())
}
}

View File

@@ -81,7 +81,8 @@ pub struct TermsAggregation {
///
/// Should never be smaller than size.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub shard_size: Option<u32>,
#[serde(alias = "shard_size")]
pub split_size: Option<u32>,
/// The get more accurate results, we fetch more than `size` from each segment.
///
@@ -96,11 +97,11 @@ pub struct TermsAggregation {
/// doc_count returned by each shard. Its the sum of the size of the largest bucket on
/// each segment that didnt fit into `shard_size`.
///
/// Defaults to true when ordering by counts desc.
/// Defaults to true when ordering by count desc.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub show_term_doc_count_error: Option<bool>,
/// Filter all terms than are lower `min_doc_count`. Defaults to 1.
/// Filter all terms that are lower than `min_doc_count`. Defaults to 1.
///
/// **Expensive**: When set to 0, this will return all terms in the field.
#[serde(skip_serializing_if = "Option::is_none", default)]
@@ -143,7 +144,7 @@ pub(crate) struct TermsAggregationInternal {
/// Increasing this value is will increase the cost for more accuracy.
pub segment_size: u32,
/// Filter all terms than are lower `min_doc_count`. Defaults to 1.
/// Filter all terms that are lower than `min_doc_count`. Defaults to 1.
///
/// *Expensive*: When set to 0, this will return all terms in the field.
pub min_doc_count: u64,
@@ -572,7 +573,7 @@ mod tests {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
size: Some(2),
shard_size: Some(2),
split_size: Some(2),
..Default::default()
}),
sub_aggregation: Default::default(),
@@ -1210,6 +1211,51 @@ mod tests {
.unwrap();
assert_eq!(agg_req, agg_req_deser);
let elasticsearch_compatible_json = json!(
{
"term_agg_test":{
"terms": {
"field": "string_id",
"split_size": 2u64,
}
}
});
// test alias shard_size, split_size
let agg_req: Aggregations = vec![(
"term_agg_test".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
split_size: Some(2),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let agg_req_deser: Aggregations =
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
.unwrap();
assert_eq!(agg_req, agg_req_deser);
let elasticsearch_compatible_json = json!(
{
"term_agg_test":{
"terms": {
"field": "string_id",
"shard_size": 2u64,
}
}
});
let agg_req_deser: Aggregations =
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
.unwrap();
assert_eq!(agg_req, agg_req_deser);
Ok(())
}
}

View File

@@ -24,7 +24,9 @@ use crate::aggregation::bucket::TermsAggregationInternal;
/// intermediate results.
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateAggregationResults {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) metrics: Option<VecWithNames<IntermediateMetricResult>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) buckets: Option<VecWithNames<IntermediateBucketResult>>,
}

View File

@@ -20,7 +20,8 @@
//!
//! #### Limitations
//!
//! Currently aggregations work only on single value fast fields of type u64, f64 and i64.
//! Currently aggregations work only on single value fast fields of type u64, f64, i64 and
//! fast fields on text fields.
//!
//! # JSON Format
//! Aggregations request and result structures de/serialize into elasticsearch compatible JSON.

View File

@@ -92,7 +92,7 @@ mod histogram_collector;
pub use histogram_collector::HistogramCollector;
mod multi_collector;
pub use self::multi_collector::MultiCollector;
pub use self::multi_collector::{FruitHandle, MultiCollector, MultiFruit};
mod top_collector;

View File

@@ -5,6 +5,7 @@ use super::{Collector, SegmentCollector};
use crate::collector::Fruit;
use crate::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
/// MultiFruit keeps Fruits from every nested Collector
pub struct MultiFruit {
sub_fruits: Vec<Option<Box<dyn Fruit>>>,
}
@@ -79,12 +80,17 @@ impl<TSegmentCollector: SegmentCollector> BoxableSegmentCollector
}
}
/// FruitHandle stores reference to the corresponding collector inside MultiCollector
pub struct FruitHandle<TFruit: Fruit> {
pos: usize,
_phantom: PhantomData<TFruit>,
}
impl<TFruit: Fruit> FruitHandle<TFruit> {
/// Extract a typed fruit off a multifruit.
///
/// This function involves downcasting and can panic if the multifruit was
/// created using faulty code.
pub fn extract(self, fruits: &mut MultiFruit) -> TFruit {
let boxed_fruit = fruits.sub_fruits[self.pos].take().expect("");
*boxed_fruit

View File

@@ -1,6 +1,7 @@
use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder};
use crate::TantivyError;
/// Search executor whether search request are single thread or multithread.
///
/// We don't expose Rayon thread pool directly here for several reasons.
@@ -47,16 +48,19 @@ impl Executor {
match self {
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
Executor::ThreadPool(pool) => {
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
let num_fruits = args_with_indices.len();
let args: Vec<A> = args.collect();
let num_fruits = args.len();
let fruit_receiver = {
let (fruit_sender, fruit_receiver) = channel::unbounded();
let (fruit_sender, fruit_receiver) = crossbeam_channel::unbounded();
pool.scope(|scope| {
for arg_with_idx in args_with_indices {
scope.spawn(|_| {
let (idx, arg) = arg_with_idx;
let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx, fruit)) {
for (idx, arg) in args.into_iter().enumerate() {
// We name references for f and fruit_sender_ref because we do not
// want these two to be moved into the closure.
let f_ref = &f;
let fruit_sender_ref = &fruit_sender;
scope.spawn(move |_| {
let fruit = f_ref(arg);
if let Err(err) = fruit_sender_ref.send((idx, fruit)) {
error!(
"Failed to send search task. It probably means all search \
threads have panicked. {:?}",
@@ -71,18 +75,19 @@ impl Executor {
// This is important as it makes it possible for the fruit_receiver iteration to
// terminate.
};
// This is lame, but safe.
let mut results_with_position = Vec::with_capacity(num_fruits);
let mut result_placeholders: Vec<Option<R>> =
std::iter::repeat_with(|| None).take(num_fruits).collect();
for (pos, fruit_res) in fruit_receiver {
let fruit = fruit_res?;
results_with_position.push((pos, fruit));
result_placeholders[pos] = Some(fruit);
}
results_with_position.sort_by_key(|(pos, _)| *pos);
assert_eq!(results_with_position.len(), num_fruits);
Ok(results_with_position
.into_iter()
.map(|(_, fruit)| fruit)
.collect::<Vec<_>>())
let results: Vec<R> = result_placeholders.into_iter().flatten().collect();
if results.len() != num_fruits {
return Err(TantivyError::InternalError(
"One of the mapped execution failed.".to_string(),
));
}
Ok(results)
}
}
}

View File

@@ -74,6 +74,7 @@ fn load_metas(
pub struct IndexBuilder {
schema: Option<Schema>,
index_settings: IndexSettings,
tokenizer_manager: TokenizerManager,
}
impl Default for IndexBuilder {
fn default() -> Self {
@@ -86,6 +87,7 @@ impl IndexBuilder {
Self {
schema: None,
index_settings: IndexSettings::default(),
tokenizer_manager: TokenizerManager::default(),
}
}
@@ -103,6 +105,12 @@ impl IndexBuilder {
self
}
/// Set the tokenizers .
pub fn tokenizers(mut self, tokenizers: TokenizerManager) -> Self {
self.tokenizer_manager = tokenizers;
self
}
/// Creates a new index using the `RAMDirectory`.
///
/// The index will be allocated in anonymous memory.
@@ -154,7 +162,8 @@ impl IndexBuilder {
if !Index::exists(&*dir)? {
return self.create(dir);
}
let index = Index::open(dir)?;
let mut index = Index::open(dir)?;
index.set_tokenizers(self.tokenizer_manager.clone());
if index.schema() == self.get_expect_schema()? {
Ok(index)
} else {
@@ -176,7 +185,8 @@ impl IndexBuilder {
)?;
let mut metas = IndexMeta::with_schema(self.get_expect_schema()?);
metas.index_settings = self.index_settings;
let index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
let mut index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
index.set_tokenizers(self.tokenizer_manager);
Ok(index)
}
}
@@ -304,6 +314,11 @@ impl Index {
}
}
/// Setter for the tokenizer manager.
pub fn set_tokenizers(&mut self, tokenizers: TokenizerManager) {
self.tokenizers = tokenizers;
}
/// Accessor for the tokenizer manager.
pub fn tokenizers(&self) -> &TokenizerManager {
&self.tokenizers
@@ -314,20 +329,31 @@ impl Index {
let field_entry = self.schema.get_field_entry(field);
let field_type = field_entry.field_type();
let tokenizer_manager: &TokenizerManager = self.tokenizers();
let tokenizer_name_opt: Option<TextAnalyzer> = match field_type {
FieldType::Str(text_options) => text_options
.get_indexing_options()
.map(|text_indexing_options| text_indexing_options.tokenizer().to_string())
.and_then(|tokenizer_name| tokenizer_manager.get(&tokenizer_name)),
_ => None,
let indexing_options_opt = match field_type {
FieldType::JsonObject(options) => options.get_text_indexing_options(),
FieldType::Str(options) => options.get_indexing_options(),
_ => {
return Err(TantivyError::SchemaError(format!(
"{:?} is not a text field.",
field_entry.name()
)))
}
};
match tokenizer_name_opt {
Some(tokenizer) => Ok(tokenizer),
None => Err(TantivyError::SchemaError(format!(
"{:?} is not a text field.",
field_entry.name()
))),
}
let indexing_options = indexing_options_opt.ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"No indexing options set for field {:?}",
field_entry
))
})?;
tokenizer_manager
.get(indexing_options.tokenizer())
.ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"No Tokenizer found for field {:?}",
field_entry
))
})
}
/// Create a default `IndexReader` for the given index.
@@ -557,7 +583,8 @@ impl fmt::Debug for Index {
mod tests {
use crate::directory::{RamDirectory, WatchCallback};
use crate::schema::{Field, Schema, INDEXED, TEXT};
use crate::{Directory, Index, IndexReader, IndexSettings, ReloadPolicy};
use crate::tokenizer::TokenizerManager;
use crate::{Directory, Index, IndexBuilder, IndexReader, IndexSettings, ReloadPolicy};
#[test]
fn test_indexer_for_field() {
@@ -573,6 +600,21 @@ mod tests {
);
}
#[test]
fn test_set_tokenizer_manager() {
let mut schema_builder = Schema::builder();
schema_builder.add_u64_field("num_likes", INDEXED);
schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = IndexBuilder::new()
// set empty tokenizer manager
.tokenizers(TokenizerManager::new())
.schema(schema)
.create_in_ram()
.unwrap();
assert!(index.tokenizers().get("raw").is_none());
}
#[test]
fn test_index_exists() {
let directory: Box<dyn Directory> = Box::new(RamDirectory::create());
@@ -702,7 +744,7 @@ mod tests {
.try_into()?;
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64))?;
let (sender, receiver) = crossbeam::channel::unbounded();
let (sender, receiver) = crossbeam_channel::unbounded();
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
@@ -737,7 +779,7 @@ mod tests {
reader: &IndexReader,
) -> crate::Result<()> {
let mut reader_index = reader.index();
let (sender, receiver) = crossbeam::channel::unbounded();
let (sender, receiver) = crossbeam_channel::unbounded();
let _watch_handle = reader_index
.directory_mut()
.watch(WatchCallback::new(move || {

View File

@@ -239,7 +239,7 @@ impl InnerSegmentMeta {
///
/// Contains settings which are applied on the whole
/// index, like presort documents.
#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IndexSettings {
/// Sorts the documents by information
/// provided in `IndexSortByField`
@@ -248,7 +248,26 @@ pub struct IndexSettings {
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
}
/// Must be a function to be compatible with serde defaults
fn default_docstore_blocksize() -> usize {
16_384
}
impl Default for IndexSettings {
fn default() -> Self {
Self {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
}
}
}
/// Settings to presort the documents in an index
///
/// Presorting documents can greatly performance
@@ -401,7 +420,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();

View File

@@ -110,7 +110,7 @@ mod tests {
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let (tx, rx) = crossbeam_channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
@@ -153,7 +153,7 @@ mod tests {
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let (tx, rx) = crossbeam_channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);

View File

@@ -181,7 +181,7 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
fn test_watch(directory: &dyn Directory) {
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let (tx, rx) = crossbeam_channel::unbounded();
let timeout = Duration::from_millis(500);
let handle = directory

View File

@@ -300,7 +300,7 @@ impl IntFastFieldWriter {
/// If the document has more than one value for the given field,
/// only the first one is taken in account.
///
/// Values for string fast fields are skipped.
/// Values on text fast fields are skipped.
pub fn add_document(&mut self, doc: &Document) {
match doc.get_first(self.field) {
Some(v) => {

View File

@@ -4,7 +4,6 @@ use std::thread;
use std::thread::JoinHandle;
use common::BitSet;
use crossbeam::channel;
use smallvec::smallvec;
use super::operation::{AddOperation, UserOperation};
@@ -289,7 +288,7 @@ impl IndexWriter {
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
@@ -326,7 +325,7 @@ impl IndexWriter {
}
fn drop_sender(&mut self) {
let (sender, _receiver) = channel::bounded(1);
let (sender, _receiver) = crossbeam_channel::bounded(1);
self.operation_sender = sender;
}
@@ -532,7 +531,7 @@ impl IndexWriter {
/// Returns the former segment_ready channel.
fn recreate_document_channel(&mut self) {
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
self.operation_sender = document_sender;
self.index_writer_status = IndexWriterStatus::from(document_receiver);
}

View File

@@ -92,7 +92,7 @@ impl Drop for IndexWriterBomb {
mod tests {
use std::mem;
use crossbeam::channel;
use crossbeam_channel as channel;
use super::IndexWriterStatus;

View File

@@ -4,7 +4,7 @@ use murmurhash32::murmurhash2;
use crate::fastfield::FastValue;
use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter};
use crate::schema::term::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
use crate::schema::Type;
use crate::schema::{Field, Type};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::{OffsetDateTime, UtcOffset};
use crate::tokenizer::TextAnalyzer;
@@ -57,7 +57,7 @@ struct IndexingPositionsPerPath {
impl IndexingPositionsPerPath {
fn get_position(&mut self, term: &Term) -> &mut IndexingPosition {
self.positions_per_path
.entry(murmurhash2(term.value_bytes()))
.entry(murmurhash2(term.as_slice()))
.or_insert_with(Default::default)
}
}
@@ -199,16 +199,81 @@ fn infer_type_from_str(text: &str) -> TextOrDateTime {
}
}
// Tries to infer a JSON type from a string
pub(crate) fn convert_to_fast_value_and_get_term(
json_term_writer: &mut JsonTermWriter,
phrase: &str,
) -> Option<Term> {
if let Ok(dt) = OffsetDateTime::parse(phrase, &Rfc3339) {
let dt_utc = dt.to_offset(UtcOffset::UTC);
return Some(set_fastvalue_and_get_term(
json_term_writer,
DateTime::from_utc(dt_utc),
));
}
if let Ok(u64_val) = str::parse::<u64>(phrase) {
return Some(set_fastvalue_and_get_term(json_term_writer, u64_val));
}
if let Ok(i64_val) = str::parse::<i64>(phrase) {
return Some(set_fastvalue_and_get_term(json_term_writer, i64_val));
}
if let Ok(f64_val) = str::parse::<f64>(phrase) {
return Some(set_fastvalue_and_get_term(json_term_writer, f64_val));
}
None
}
// helper function to generate a Term from a json fastvalue
pub(crate) fn set_fastvalue_and_get_term<T: FastValue>(
json_term_writer: &mut JsonTermWriter,
value: T,
) -> Term {
json_term_writer.set_fast_value(value);
json_term_writer.term().clone()
}
// helper function to generate a list of terms with their positions from a textual json value
pub(crate) fn set_string_and_get_terms(
json_term_writer: &mut JsonTermWriter,
value: &str,
text_analyzer: &TextAnalyzer,
) -> Vec<(usize, Term)> {
let mut positions_and_terms = Vec::<(usize, Term)>::new();
json_term_writer.close_path_and_set_type(Type::Str);
let term_num_bytes = json_term_writer.term_buffer.as_slice().len();
let mut token_stream = text_analyzer.token_stream(value);
token_stream.process(&mut |token| {
json_term_writer.term_buffer.truncate(term_num_bytes);
json_term_writer
.term_buffer
.append_bytes(token.text.as_bytes());
positions_and_terms.push((token.position, json_term_writer.term().clone()));
});
positions_and_terms
}
pub struct JsonTermWriter<'a> {
term_buffer: &'a mut Term,
path_stack: Vec<usize>,
}
impl<'a> JsonTermWriter<'a> {
pub fn from_field_and_json_path(
field: Field,
json_path: &str,
term_buffer: &'a mut Term,
) -> Self {
term_buffer.set_field(Type::Json, field);
let mut json_term_writer = Self::wrap(term_buffer);
for segment in json_path.split('.') {
json_term_writer.push_path_segment(segment);
}
json_term_writer
}
pub fn wrap(term_buffer: &'a mut Term) -> Self {
term_buffer.clear_with_type(Type::Json);
let mut path_stack = Vec::with_capacity(10);
path_stack.push(5); // magic number?
path_stack.push(5);
Self {
term_buffer,
path_stack,
@@ -250,8 +315,8 @@ impl<'a> JsonTermWriter<'a> {
/// Returns the json path of the term being currently built.
#[cfg(test)]
pub(crate) fn path(&self) -> &[u8] {
let end_of_path = self.path_stack.last().cloned().unwrap_or(6); // TODO remove magic number
&self.term().value_bytes()[..end_of_path - 1]
let end_of_path = self.path_stack.last().cloned().unwrap_or(6);
&self.term().as_slice()[5..end_of_path - 1]
}
pub fn set_fast_value<T: FastValue>(&mut self, val: T) {
@@ -321,7 +386,10 @@ mod tests {
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
json_writer.set_str("red");
assert_eq!(json_writer.term().value_bytes(), b"color\x00sred")
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00sred"
)
}
#[test]
@@ -333,8 +401,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_fast_value(-4i64);
assert_eq!(
json_writer.term().value_bytes(),
b"color\x00i\x7f\xff\xff\xff\xff\xff\xff\xfc"
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00i\x7f\xff\xff\xff\xff\xff\xff\xfc"
)
}
@@ -347,8 +415,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_fast_value(4u64);
assert_eq!(
json_writer.term().value_bytes(),
b"color\x00u\x00\x00\x00\x00\x00\x00\x00\x04"
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00u\x00\x00\x00\x00\x00\x00\x00\x04"
)
}
@@ -361,8 +429,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_fast_value(4.0f64);
assert_eq!(
json_writer.term().value_bytes(),
b"color\x00f\xc0\x10\x00\x00\x00\x00\x00\x00"
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00f\xc0\x10\x00\x00\x00\x00\x00\x00"
)
}
@@ -377,8 +445,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_str("red");
assert_eq!(
json_writer.term().value_bytes(),
b"attribute\x01color\x00sred"
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jattribute\x01color\x00sred"
)
}
@@ -392,7 +460,10 @@ mod tests {
json_writer.push_path_segment("hue");
json_writer.pop_path_segment();
json_writer.set_str("red");
assert_eq!(json_writer.term().value_bytes(), b"color\x00sred")
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00sred"
)
}
#[test]

View File

@@ -21,11 +21,13 @@ pub mod segment_updater;
mod segment_writer;
mod stamper;
use crossbeam::channel;
use crossbeam_channel as channel;
use smallvec::SmallVec;
pub use self::index_writer::IndexWriter;
pub(crate) use self::json_term_writer::JsonTermWriter;
pub(crate) use self::json_term_writer::{
convert_to_fast_value_and_get_term, set_string_and_get_terms, JsonTermWriter,
};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};

View File

@@ -39,9 +39,10 @@ impl SegmentSerializer {
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
let blocksize = segment.index().settings().docstore_blocksize;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor),
store_writer: StoreWriter::new(store_write, compressor, blocksize),
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,

View File

@@ -1,6 +1,5 @@
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::io;
use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;
@@ -27,7 +26,7 @@ use crate::indexer::{
SegmentSerializer,
};
use crate::schema::Schema;
use crate::{FutureResult, Opstamp, TantivyError};
use crate::{FutureResult, Opstamp};
const NUM_MERGE_THREADS: usize = 4;
@@ -73,10 +72,12 @@ fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()>
let mut buffer = serde_json::to_vec_pretty(metas)?;
// Just adding a new line at the end of the buffer.
writeln!(&mut buffer)?;
fail_point!("save_metas", |msg| Err(TantivyError::from(io::Error::new(
io::ErrorKind::Other,
msg.unwrap_or_else(|| "Undefined".to_string())
))));
fail_point!("save_metas", |msg| Err(crate::TantivyError::from(
std::io::Error::new(
std::io::ErrorKind::Other,
msg.unwrap_or_else(|| "Undefined".to_string())
)
)));
directory.sync_directory()?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));

View File

@@ -6,7 +6,8 @@ use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::indexer::json_term_writer::index_json_values;
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::postings::{
serialize_postings, IndexingContext, IndexingPosition, PerFieldPostingsWriter, PostingsWriter,
compute_table_size, serialize_postings, IndexingContext, IndexingPosition,
PerFieldPostingsWriter, PostingsWriter,
};
use crate::schema::{FieldEntry, FieldType, FieldValue, Schema, Term, Value};
use crate::store::{StoreReader, StoreWriter};
@@ -15,6 +16,25 @@ use crate::tokenizer::{
};
use crate::{DocId, Document, Opstamp, SegmentComponent};
/// Computes the initial size of the hash table.
///
/// Returns the recommended initial table size as a power of 2.
///
/// Note this is a very dumb way to compute log2, but it is easier to proofread that way.
fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
let table_memory_upper_bound = per_thread_memory_budget / 3;
(10..20) // We cap it at 2^19 = 512K capacity.
.map(|power| 1 << power)
.take_while(|capacity| compute_table_size(*capacity) < table_memory_upper_bound)
.last()
.ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!(
"per thread memory budget (={per_thread_memory_budget}) is too small. Raise the \
memory budget or lower the number of threads."
))
})
}
fn remap_doc_opstamps(
opstamps: Vec<Opstamp>,
doc_id_mapping_opt: Option<&DocIdMapping>,
@@ -58,11 +78,12 @@ impl SegmentWriter {
/// - segment: The segment being written
/// - schema
pub fn for_segment(
_memory_budget_in_bytes: usize,
memory_budget_in_bytes: usize,
segment: Segment,
schema: Schema,
) -> crate::Result<SegmentWriter> {
let tokenizer_manager = segment.index().tokenizers().clone();
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema);
let per_field_text_analyzers = schema
@@ -85,7 +106,7 @@ impl SegmentWriter {
.collect();
Ok(SegmentWriter {
max_doc: 0,
ctx: IndexingContext::new(),
ctx: IndexingContext::new(table_size),
per_field_postings_writers,
fieldnorms_writer: FieldNormsWriter::for_schema(&schema),
segment_serializer,
@@ -128,7 +149,6 @@ impl SegmentWriter {
pub fn mem_usage(&self) -> usize {
self.ctx.mem_usage()
+ self.fieldnorms_writer.mem_usage()
+ self.per_field_postings_writers.mem_usage()
+ self.fast_field_writers.mem_usage()
+ self.segment_serializer.mem_usage()
}
@@ -203,7 +223,7 @@ impl SegmentWriter {
let mut indexing_position = IndexingPosition::default();
for mut token_stream in token_streams {
// assert_eq!(term_buffer.as_slice().len(), 5);
assert_eq!(term_buffer.as_slice().len(), 5);
postings_writer.index_text(
doc_id,
&mut *token_stream,
@@ -352,9 +372,10 @@ fn remap_and_write(
.segment_mut()
.open_write(SegmentComponent::Store)?;
let compressor = serializer.segment().index().settings().docstore_compression;
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor),
StoreWriter::new(store_write, compressor, block_size),
);
old_store_writer.close()?;
let store_read = StoreReader::open(
@@ -398,6 +419,7 @@ pub fn prepare_doc_for_store(doc: Document, schema: &Schema) -> Document {
#[cfg(test)]
mod tests {
use super::compute_initial_table_size;
use crate::collector::Count;
use crate::indexer::json_term_writer::JsonTermWriter;
use crate::postings::TermInfo;
@@ -408,6 +430,15 @@ mod tests {
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{DateTime, DocAddress, DocSet, Document, Index, Postings, Term, TERMINATED};
#[test]
fn test_hashmap_size() {
assert_eq!(compute_initial_table_size(100_000).unwrap(), 1 << 11);
assert_eq!(compute_initial_table_size(1_000_000).unwrap(), 1 << 14);
assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 17);
assert_eq!(compute_initial_table_size(1_000_000_000).unwrap(), 1 << 19);
assert_eq!(compute_initial_table_size(4_000_000_000).unwrap(), 1 << 19);
}
#[test]
fn test_prepare_for_store() {
let mut schema_builder = Schema::builder();
@@ -677,4 +708,38 @@ mod tests {
let phrase_query = PhraseQuery::new(vec![nothello_term, happy_term]);
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}
#[test]
fn test_bug_regression_1629_position_when_array_with_a_field_value_that_does_not_contain_any_token(
) {
// We experienced a bug where we would have a position underflow when computing position
// delta in an horrible corner case.
//
// See the commit with this unit test if you want the details.
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let doc = schema
.parse_document(r#"{"text": [ "bbb", "aaa", "", "aaa"]}"#)
.unwrap();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.add_document(doc).unwrap();
// On debug this did panic on the underflow
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let seg_reader = searcher.segment_reader(0);
let inv_index = seg_reader.inverted_index(text).unwrap();
let term = Term::from_field_text(text, "aaa");
let mut postings = inv_index
.read_postings(&term, IndexRecordOption::WithFreqsAndPositions)
.unwrap()
.unwrap();
assert_eq!(postings.doc(), 0u32);
let mut positions = Vec::new();
postings.positions(&mut positions);
// On release this was [2, 1]. (< note the decreasing values)
assert_eq!(positions, &[2, 5]);
}
}

View File

@@ -1,24 +1,27 @@
use crate::postings::stacker::MemoryArena;
use crate::postings::stacker::{MemoryArena, TermHashMap};
/// IndexingContext contains all of the transient memory arenas
/// required for building the inverted index.
pub(crate) struct IndexingContext {
/// The term index is an adhoc hashmap,
/// itself backed by a dedicated memory arena.
pub term_index: TermHashMap,
/// Arena is a memory arena that stores posting lists / term frequencies / positions.
pub arena: MemoryArena,
pub arena_terms: MemoryArena,
}
impl IndexingContext {
/// Create a new IndexingContext given the size of the term hash map.
pub(crate) fn new() -> IndexingContext {
pub(crate) fn new(table_size: usize) -> IndexingContext {
let term_index = TermHashMap::new(table_size);
IndexingContext {
arena: MemoryArena::new(),
arena_terms: MemoryArena::new(),
term_index,
}
}
/// Returns the memory usage for the inverted index memory arenas, in bytes.
pub(crate) fn mem_usage(&self) -> usize {
self.arena.mem_usage() + self.arena_terms.mem_usage()
self.term_index.mem_usage() + self.arena.mem_usage()
}
}

View File

@@ -1,6 +1,5 @@
use std::io;
use super::stacker::TermHashMap;
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::postings_writer::SpecializedPostingsWriter;
@@ -27,14 +26,6 @@ impl<Rec: Recorder> From<JsonPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
}
impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
fn mem_usage(&self) -> usize {
self.str_posting_writer.mem_usage() + self.non_str_posting_writer.mem_usage()
}
fn term_map(&self) -> &TermHashMap {
self.str_posting_writer.term_map()
}
fn subscribe(
&mut self,
doc: crate::DocId,
@@ -83,7 +74,6 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
doc_id_map,
&mut buffer_lender,
ctx,
&self.str_posting_writer.term_map,
serializer,
)?;
} else {
@@ -93,7 +83,6 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
doc_id_map,
&mut buffer_lender,
ctx,
&self.str_posting_writer.term_map,
serializer,
)?;
}

View File

@@ -26,6 +26,7 @@ pub(crate) use self::postings_writer::{serialize_postings, IndexingPosition, Pos
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub(crate) use self::stacker::compute_table_size;
pub use self::term_info::TermInfo;
pub(crate) type UnorderedTermId = u64;

View File

@@ -10,10 +10,9 @@ pub(crate) struct PerFieldPostingsWriter {
impl PerFieldPostingsWriter {
pub fn for_schema(schema: &Schema) -> Self {
let num_fields = schema.num_fields();
let per_field_postings_writers = schema
.fields()
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry, num_fields))
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry))
.collect();
PerFieldPostingsWriter {
per_field_postings_writers,
@@ -27,19 +26,9 @@ impl PerFieldPostingsWriter {
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_mut()
}
pub(crate) fn mem_usage(&self) -> usize {
self.per_field_postings_writers
.iter()
.map(|postings_writer| postings_writer.mem_usage())
.sum()
}
}
fn posting_writer_from_field_entry(
field_entry: &FieldEntry,
_num_fields: usize,
) -> Box<dyn PostingsWriter> {
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter> {
match *field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()

View File

@@ -1,10 +1,11 @@
use std::collections::HashMap;
use std::io;
use std::marker::PhantomData;
use std::ops::Range;
use fnv::FnvHashMap;
use super::stacker::{Addr, TermHashMap};
use super::stacker::Addr;
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -20,6 +21,31 @@ use crate::DocId;
const POSITION_GAP: u32 = 1;
fn make_field_partition(
term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)],
) -> Vec<(Field, Range<usize>)> {
let term_offsets_it = term_offsets
.iter()
.map(|(term, _, _)| term.field())
.enumerate();
let mut prev_field_opt = None;
let mut fields = vec![];
let mut offsets = vec![];
for (offset, field) in term_offsets_it {
if Some(field) != prev_field_opt {
prev_field_opt = Some(field);
fields.push(field);
offsets.push(offset);
}
}
offsets.push(term_offsets.len());
let mut field_offsets = vec![];
for i in 0..fields.len() {
field_offsets.push((fields[i], offsets[i]..offsets[i + 1]));
}
field_offsets
}
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
@@ -31,23 +57,23 @@ pub(crate) fn serialize_postings(
schema: &Schema,
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();
for (field, _) in schema.fields() {
let postings_writer = per_field_postings_writers.get_for_field(field);
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(postings_writer.term_map().len());
term_offsets.extend(postings_writer.term_map().iter(&ctx.arena_terms));
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let field_offsets = make_field_partition(&term_offsets);
for (field, byte_offsets) in field_offsets {
let field_entry = schema.get_field_entry(field);
match *field_entry.field_type() {
FieldType::Str(_) | FieldType::Facet(_) => {
// populating the (unordered term ord) -> (ordered term ord) mapping
// for the field.
let unordered_term_ids = term_offsets.iter().map(|&(_, _, bucket)| bucket);
let unordered_term_ids = term_offsets[byte_offsets.clone()]
.iter()
.map(|&(_, _, bucket)| bucket);
let mapping: FnvHashMap<UnorderedTermId, TermOrdinal> = unordered_term_ids
.enumerate()
.map(|(term_ord, unord_term_id)| {
@@ -61,10 +87,16 @@ pub(crate) fn serialize_postings(
FieldType::JsonObject(_) => {}
}
let postings_writer = per_field_postings_writers.get_for_field(field);
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
let mut field_serializer =
serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?;
postings_writer.serialize(&term_offsets, doc_id_map, &ctx, &mut field_serializer)?;
postings_writer.serialize(
&term_offsets[byte_offsets],
doc_id_map,
&ctx,
&mut field_serializer,
)?;
field_serializer.close()?;
}
Ok(unordered_term_mappings)
@@ -96,10 +128,6 @@ pub(crate) trait PostingsWriter {
ctx: &mut IndexingContext,
) -> UnorderedTermId;
fn mem_usage(&self) -> usize;
fn term_map(&self) -> &TermHashMap;
/// Serializes the postings on disk.
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
@@ -120,9 +148,9 @@ pub(crate) trait PostingsWriter {
indexing_position: &mut IndexingPosition,
mut term_id_fast_field_writer_opt: Option<&mut MultiValuedFastFieldWriter>,
) {
let end_of_path_idx = term_buffer.value_bytes().len();
let end_of_path_idx = term_buffer.as_slice().len();
let mut num_tokens = 0;
let mut end_position = 0;
let mut end_position = indexing_position.end_position;
token_stream.process(&mut |token: &Token| {
// We skip all tokens with a len greater than u16.
if token.text.len() > MAX_TOKEN_LEN {
@@ -160,7 +188,6 @@ pub(crate) trait PostingsWriter {
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
total_num_tokens: u64,
_recorder_type: PhantomData<Rec>,
pub(crate) term_map: TermHashMap,
}
impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
@@ -179,10 +206,9 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
doc_id_map: Option<&DocIdMapping>,
buffer_lender: &mut BufferLender,
ctx: &IndexingContext,
term_index: &TermHashMap,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let recorder: Rec = term_index.read(addr, &ctx.arena_terms);
let recorder: Rec = ctx.term_index.read(addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
@@ -192,14 +218,6 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
}
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
fn mem_usage(&self) -> usize {
self.term_map.mem_usage()
}
fn term_map(&self) -> &TermHashMap {
&self.term_map
}
fn subscribe(
&mut self,
doc: DocId,
@@ -207,30 +225,25 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
term: &Term,
ctx: &mut IndexingContext,
) -> UnorderedTermId {
//debug_assert!(term.value_bytes().len() >= 1);
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
let arena = &mut ctx.arena;
let arena_terms = &mut ctx.arena_terms;
self.term_map.mutate_or_create(
term.value_bytes(),
arena_terms,
|opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(arena);
recorder.new_doc(doc, arena);
}
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::default();
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(arena);
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
},
) as UnorderedTermId
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::default();
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
}) as UnorderedTermId
}
fn serialize(
@@ -242,15 +255,7 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
Self::serialize_one_term(
term,
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
&self.term_map,
serializer,
)?;
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
}
Ok(())
}

View File

@@ -46,7 +46,6 @@ impl Addr {
}
/// Returns the `Addr` object for `addr + offset`
#[inline]
pub fn offset(self, offset: u32) -> Addr {
Addr(self.0.wrapping_add(offset))
}
@@ -55,24 +54,20 @@ impl Addr {
Addr((page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32)
}
#[inline]
fn page_id(self) -> usize {
(self.0 as usize) >> NUM_BITS_PAGE_ADDR
}
#[inline]
fn page_local_addr(self) -> usize {
(self.0 as usize) & (PAGE_SIZE - 1)
}
/// Returns true if and only if the `Addr` is null.
#[inline]
pub fn is_null(self) -> bool {
self.0 == u32::max_value()
}
}
#[inline]
pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
assert_eq!(dest.len(), std::mem::size_of::<Item>());
unsafe {
@@ -80,7 +75,6 @@ pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
}
}
#[inline]
pub fn load<Item: Copy + 'static>(data: &[u8]) -> Item {
assert_eq!(data.len(), std::mem::size_of::<Item>());
unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) }
@@ -116,7 +110,6 @@ impl MemoryArena {
self.pages.len() * PAGE_SIZE
}
#[inline]
pub fn write_at<Item: Copy + 'static>(&mut self, addr: Addr, val: Item) {
let dest = self.slice_mut(addr, std::mem::size_of::<Item>());
store(dest, val);
@@ -127,7 +120,6 @@ impl MemoryArena {
/// # Panics
///
/// If the address is erroneous
#[inline]
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
load(self.slice(addr, mem::size_of::<Item>()))
}
@@ -136,7 +128,6 @@ impl MemoryArena {
self.pages[addr.page_id()].slice(addr.page_local_addr(), len)
}
#[inline]
pub fn slice_from(&self, addr: Addr) -> &[u8] {
self.pages[addr.page_id()].slice_from(addr.page_local_addr())
}

View File

@@ -4,4 +4,4 @@ mod term_hashmap;
pub(crate) use self::expull::ExpUnrolledLinkedList;
pub(crate) use self::memory_arena::{Addr, MemoryArena};
pub(crate) use self::term_hashmap::TermHashMap;
pub(crate) use self::term_hashmap::{compute_table_size, TermHashMap};

View File

@@ -1,6 +1,6 @@
use std::convert::TryInto;
use std::{iter, mem, slice};
use byteorder::{ByteOrder, NativeEndian};
use murmurhash32::murmurhash2;
use super::{Addr, MemoryArena};
@@ -8,6 +8,13 @@ use crate::postings::stacker::memory_arena::store;
use crate::postings::UnorderedTermId;
use crate::Term;
/// Returns the actual memory size in bytes
/// required to create a table with a given capacity.
/// required to create a table of size
pub(crate) fn compute_table_size(capacity: usize) -> usize {
capacity * mem::size_of::<KeyValue>()
}
/// `KeyValue` is the item stored in the hash table.
/// The key is actually a `BytesRef` object stored in an external memory arena.
/// The `value_addr` also points to an address in the memory arena.
@@ -29,7 +36,6 @@ impl Default for KeyValue {
}
impl KeyValue {
#[inline]
fn is_empty(self) -> bool {
self.key_value_addr.is_null()
}
@@ -45,17 +51,12 @@ impl KeyValue {
/// or copying the key as long as there is no insert.
pub struct TermHashMap {
table: Box<[KeyValue]>,
memory_arena: MemoryArena,
mask: usize,
occupied: Vec<usize>,
len: usize,
}
impl Default for TermHashMap {
fn default() -> Self {
Self::new(1 << 10)
}
}
struct QuadraticProbing {
hash: usize,
i: usize,
@@ -74,21 +75,18 @@ impl QuadraticProbing {
}
}
pub struct Iter<'a, 'm> {
pub struct Iter<'a> {
hashmap: &'a TermHashMap,
memory_arena: &'m MemoryArena,
inner: slice::Iter<'a, usize>,
}
impl<'a, 'm> Iterator for Iter<'a, 'm> {
type Item = (Term<&'m [u8]>, Addr, UnorderedTermId);
impl<'a> Iterator for Iter<'a> {
type Item = (Term<&'a [u8]>, Addr, UnorderedTermId);
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().cloned().map(move |bucket: usize| {
let kv = self.hashmap.table[bucket];
let (key, offset): (&'m [u8], Addr) = self
.hashmap
.get_key_value(kv.key_value_addr, self.memory_arena);
let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr);
(Term::wrap(key), offset, kv.unordered_term_id)
})
}
@@ -108,19 +106,21 @@ impl TermHashMap {
pub(crate) fn new(table_size: usize) -> TermHashMap {
assert!(table_size > 0);
let table_size_power_of_2 = compute_previous_power_of_two(table_size);
let memory_arena = MemoryArena::new();
let table: Vec<KeyValue> = iter::repeat(KeyValue::default())
.take(table_size_power_of_2)
.collect();
TermHashMap {
table: table.into_boxed_slice(),
memory_arena,
mask: table_size_power_of_2 - 1,
occupied: Vec::with_capacity(table_size_power_of_2 / 2),
len: 0,
}
}
pub fn read<Item: Copy + 'static>(&self, addr: Addr, memory_arena: &MemoryArena) -> Item {
memory_arena.read(addr)
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
self.memory_arena.read(addr)
}
fn probe(&self, hash: u32) -> QuadraticProbing {
@@ -129,8 +129,6 @@ impl TermHashMap {
pub fn mem_usage(&self) -> usize {
self.table.len() * mem::size_of::<KeyValue>()
+ self.occupied.len()
* std::mem::size_of_val(&self.occupied.get(0).cloned().unwrap_or_default())
}
fn is_saturated(&self) -> bool {
@@ -138,22 +136,16 @@ impl TermHashMap {
}
#[inline]
fn get_key_value<'m>(&self, addr: Addr, memory_arena: &'m MemoryArena) -> (&'m [u8], Addr) {
let data = memory_arena.slice_from(addr);
let (key_bytes_len_enc, data) = data.split_at(2);
let key_bytes_len: u16 = u16::from_ne_bytes(key_bytes_len_enc.try_into().unwrap());
let key_bytes: &[u8] = &data[..key_bytes_len as usize];
fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
let data = self.memory_arena.slice_from(addr);
let key_bytes_len = NativeEndian::read_u16(data) as usize;
let key_bytes: &[u8] = &data[2..][..key_bytes_len];
(key_bytes, addr.offset(2u32 + key_bytes_len as u32))
}
#[inline]
fn get_value_addr_if_key_match(
&self,
target_key: &[u8],
addr: Addr,
memory_arena: &mut MemoryArena,
) -> Option<Addr> {
let (stored_key, value_addr) = self.get_key_value(addr, memory_arena);
fn get_value_addr_if_key_match(&self, target_key: &[u8], addr: Addr) -> Option<Addr> {
let (stored_key, value_addr) = self.get_key_value(addr);
if stored_key == target_key {
Some(value_addr)
} else {
@@ -177,11 +169,10 @@ impl TermHashMap {
self.len
}
pub fn iter<'a, 'm>(&'a self, memory_arena: &'m MemoryArena) -> Iter<'a, 'm> {
pub fn iter(&self) -> Iter<'_> {
Iter {
inner: self.occupied.iter(),
hashmap: self,
memory_arena,
}
}
@@ -218,7 +209,6 @@ impl TermHashMap {
pub fn mutate_or_create<V, TMutator>(
&mut self,
key: &[u8],
memory_arena: &mut MemoryArena,
mut updater: TMutator,
) -> UnorderedTermId
where
@@ -229,33 +219,28 @@ impl TermHashMap {
self.resize();
}
let hash = murmurhash2(key);
let mut probe = self.probe(hash);
loop {
let bucket = probe.next_probe();
let kv: KeyValue = self.table[bucket];
if kv.is_empty() {
// The key does not exists yet.
let val = updater(None);
let num_bytes = std::mem::size_of::<u16>() + key.len() + std::mem::size_of::<V>();
let key_addr = memory_arena.allocate_space(num_bytes);
let key_addr = self.memory_arena.allocate_space(num_bytes);
{
let data = memory_arena.slice_mut(key_addr, num_bytes);
let (key_len, data) = data.split_at_mut(2);
key_len.copy_from_slice(&(key.len() as u16).to_le_bytes());
let stop = key.len();
data[..key.len()].copy_from_slice(key);
let data = self.memory_arena.slice_mut(key_addr, num_bytes);
NativeEndian::write_u16(data, key.len() as u16);
let stop = 2 + key.len();
data[2..stop].copy_from_slice(key);
store(&mut data[stop..], val);
}
return self.set_bucket(hash, key_addr, bucket);
} else if kv.hash == hash {
if let Some(val_addr) =
self.get_value_addr_if_key_match(key, kv.key_value_addr, memory_arena)
{
let v = memory_arena.read(val_addr);
if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) {
let v = self.memory_arena.read(val_addr);
let new_v = updater(Some(v));
memory_arena.write_at(val_addr, new_v);
self.memory_arena.write_at(val_addr, new_v);
return kv.unordered_term_id;
}
}
@@ -269,28 +254,26 @@ mod tests {
use std::collections::HashMap;
use super::{compute_previous_power_of_two, TermHashMap};
use crate::postings::stacker::MemoryArena;
#[test]
fn test_hash_map() {
let mut arena = MemoryArena::new();
let mut hash_map: TermHashMap = TermHashMap::new(1 << 18);
hash_map.mutate_or_create(b"abc", &mut arena, |opt_val: Option<u32>| {
hash_map.mutate_or_create(b"abc", |opt_val: Option<u32>| {
assert_eq!(opt_val, None);
3u32
});
hash_map.mutate_or_create(b"abcd", &mut arena, |opt_val: Option<u32>| {
hash_map.mutate_or_create(b"abcd", |opt_val: Option<u32>| {
assert_eq!(opt_val, None);
4u32
});
hash_map.mutate_or_create(b"abc", &mut arena, |opt_val: Option<u32>| {
hash_map.mutate_or_create(b"abc", |opt_val: Option<u32>| {
assert_eq!(opt_val, Some(3u32));
5u32
});
let mut vanilla_hash_map = HashMap::new();
let iter_values = hash_map.iter(&arena);
let iter_values = hash_map.iter();
for (key, addr, _) in iter_values {
let val: u32 = arena.read(addr);
let val: u32 = hash_map.memory_arena.read(addr);
vanilla_hash_map.insert(key.to_owned(), val);
}
assert_eq!(vanilla_hash_map.len(), 2);

View File

@@ -184,6 +184,66 @@ fn intersection_with_slop(left: &mut [u32], right: &[u32], slop: u32) -> usize {
count
}
fn intersection_count_with_slop(left: &[u32], right: &[u32], slop: u32) -> usize {
let mut left_index = 0;
let mut right_index = 0;
let mut count = 0;
let left_len = left.len();
let right_len = right.len();
while left_index < left_len && right_index < right_len {
let left_val = left[left_index];
let right_val = right[right_index];
let right_slop = if right_val >= slop {
right_val - slop
} else {
0
};
if left_val < right_slop {
left_index += 1;
} else if right_slop <= left_val && left_val <= right_val {
while left_index + 1 < left_len {
let next_left_val = left[left_index + 1];
if next_left_val > right_val {
break;
}
left_index += 1;
}
count += 1;
left_index += 1;
right_index += 1;
} else if left_val > right_val {
right_index += 1;
}
}
count
}
fn intersection_exists_with_slop(left: &[u32], right: &[u32], slop: u32) -> bool {
let mut left_index = 0;
let mut right_index = 0;
let left_len = left.len();
let right_len = right.len();
while left_index < left_len && right_index < right_len {
let left_val = left[left_index];
let right_val = right[right_index];
let right_slop = if right_val >= slop {
right_val - slop
} else {
0
};
if left_val < right_slop {
left_index += 1;
} else if right_slop <= left_val && left_val <= right_val {
return true;
} else if left_val > right_val {
right_index += 1;
}
}
false
}
impl<TPostings: Postings> PhraseScorer<TPostings> {
pub fn new(
term_postings: Vec<(usize, TPostings)>,
@@ -237,11 +297,25 @@ impl<TPostings: Postings> PhraseScorer<TPostings> {
fn phrase_exists(&mut self) -> bool {
let intersection_len = self.compute_phrase_match();
if self.has_slop() {
return intersection_exists_with_slop(
&self.left[..intersection_len],
&self.right[..],
self.slop,
);
}
intersection_exists(&self.left[..intersection_len], &self.right[..])
}
fn compute_phrase_count(&mut self) -> u32 {
let intersection_len = self.compute_phrase_match();
if self.has_slop() {
return intersection_count_with_slop(
&self.left[..intersection_len],
&self.right[..],
self.slop,
) as u32;
}
intersection_count(&self.left[..intersection_len], &self.right[..]) as u32
}
@@ -252,12 +326,7 @@ impl<TPostings: Postings> PhraseScorer<TPostings> {
.positions(&mut self.left);
}
let mut intersection_len = self.left.len();
let end_term = if self.has_slop() {
self.num_terms
} else {
self.num_terms - 1
};
for i in 1..end_term {
for i in 1..self.num_terms - 1 {
{
self.intersection_docset
.docset_mut_specialized(i)

View File

@@ -1,4 +1,4 @@
use std::collections::{BTreeSet, HashMap};
use std::collections::HashMap;
use std::num::{ParseFloatError, ParseIntError};
use std::ops::Bound;
use std::str::FromStr;
@@ -7,7 +7,9 @@ use tantivy_query_grammar::{UserInputAst, UserInputBound, UserInputLeaf, UserInp
use super::logical_ast::*;
use crate::core::Index;
use crate::indexer::JsonTermWriter;
use crate::indexer::{
convert_to_fast_value_and_get_term, set_string_and_get_terms, JsonTermWriter,
};
use crate::query::{
AllQuery, BooleanQuery, BoostQuery, EmptyQuery, Occur, PhraseQuery, Query, RangeQuery,
TermQuery,
@@ -16,7 +18,7 @@ use crate::schema::{
Facet, FacetParseError, Field, FieldType, IndexRecordOption, Schema, Term, Type,
};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::{OffsetDateTime, UtcOffset};
use crate::time::OffsetDateTime;
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::{DateTime, Score};
@@ -30,7 +32,7 @@ pub enum QueryParserError {
#[error("Unsupported query: {0}")]
UnsupportedQuery(String),
/// The query references a field that is not in the schema
#[error("Field does not exists: '{0:?}'")]
#[error("Field does not exists: '{0}'")]
FieldDoesNotExist(String),
/// The query contains a term for a `u64` or `i64`-field, but the value
/// is neither.
@@ -53,11 +55,11 @@ pub enum QueryParserError {
NoDefaultFieldDeclared,
/// The field searched for is not declared
/// as indexed in the schema.
#[error("The field '{0:?}' is not declared as indexed")]
#[error("The field '{0}' is not declared as indexed")]
FieldNotIndexed(String),
/// A phrase query was requested for a field that does not
/// have any positions indexed.
#[error("The field '{0:?}' does not have positions indexed")]
#[error("The field '{0}' does not have positions indexed")]
FieldDoesNotHavePositionsIndexed(String),
/// The tokenizer for the given field is unknown
/// The two argument strings are the name of the field, the name of the tokenizer
@@ -169,7 +171,7 @@ pub struct QueryParser {
conjunction_by_default: bool,
tokenizer_manager: TokenizerManager,
boost: HashMap<Field, Score>,
field_names: BTreeSet<String>,
field_names: HashMap<String, Field>,
}
fn all_negative(ast: &LogicalAst) -> bool {
@@ -182,6 +184,31 @@ fn all_negative(ast: &LogicalAst) -> bool {
}
}
// Returns the position (in byte offsets) of the unescaped '.' in the `field_path`.
//
// This function operates directly on bytes (as opposed to codepoint), relying
// on a encoding property of utf-8 for its correctness.
fn locate_splitting_dots(field_path: &str) -> Vec<usize> {
let mut splitting_dots_pos = Vec::new();
let mut escape_state = false;
for (pos, b) in field_path.bytes().enumerate() {
if escape_state {
escape_state = false;
continue;
}
match b {
b'\\' => {
escape_state = true;
}
b'.' => {
splitting_dots_pos.push(pos);
}
_ => {}
}
}
splitting_dots_pos
}
impl QueryParser {
/// Creates a `QueryParser`, given
/// * schema - index Schema
@@ -193,7 +220,7 @@ impl QueryParser {
) -> QueryParser {
let field_names = schema
.fields()
.map(|(_, field_entry)| field_entry.name().to_string())
.map(|(field, field_entry)| (field_entry.name().to_string(), field))
.collect();
QueryParser {
schema,
@@ -207,25 +234,18 @@ impl QueryParser {
// Splits a full_path as written in a query, into a field name and a
// json path.
pub(crate) fn split_full_path<'a>(&self, full_path: &'a str) -> (&'a str, &'a str) {
if full_path.is_empty() {
return ("", "");
pub(crate) fn split_full_path<'a>(&self, full_path: &'a str) -> Option<(Field, &'a str)> {
if let Some(field) = self.field_names.get(full_path) {
return Some((*field, ""));
}
if self.field_names.contains(full_path) {
return (full_path, "");
}
let mut result = ("", full_path);
let mut cursor = 0;
while let Some(pos) = full_path[cursor..].find('.') {
cursor += pos;
let prefix = &full_path[..cursor];
let suffix = &full_path[cursor + 1..];
if self.field_names.contains(prefix) {
result = (prefix, suffix);
let mut splitting_period_pos: Vec<usize> = locate_splitting_dots(full_path);
while let Some(pos) = splitting_period_pos.pop() {
let (prefix, suffix) = full_path.split_at(pos);
if let Some(field) = self.field_names.get(prefix) {
return Some((*field, &suffix[1..]));
}
cursor += 1;
}
result
None
}
/// Creates a `QueryParser`, given
@@ -278,12 +298,6 @@ impl QueryParser {
self.compute_logical_ast(user_input_ast)
}
fn resolve_field_name(&self, field_name: &str) -> Result<Field, QueryParserError> {
self.schema
.get_field(field_name)
.ok_or_else(|| QueryParserError::FieldDoesNotExist(String::from(field_name)))
}
fn compute_logical_ast(
&self,
user_input_ast: UserInputAst,
@@ -390,6 +404,12 @@ impl QueryParser {
if !field_type.is_indexed() {
return Err(QueryParserError::FieldNotIndexed(field_name.to_string()));
}
if field_type.value_type() != Type::Json && !json_path.is_empty() {
let field_name = self.schema.get_field_name(field);
return Err(QueryParserError::FieldDoesNotExist(format!(
"{field_name}.{json_path}"
)));
}
match *field_type {
FieldType::U64(_) => {
let val: u64 = u64::from_str(phrase)?;
@@ -531,37 +551,56 @@ impl QueryParser {
})
}
fn compute_path_triplet_for_literal<'a>(
/// Given a literal, returns the list of terms that should be searched.
///
/// The terms are identified by a triplet:
/// - tantivy field
/// - field_path: tantivy has JSON fields. It is possible to target a member of a JSON
/// object by naturally extending the json field name with a "." separated field_path
/// - field_phrase: the phrase that is being searched.
///
/// The literal identifies the targetted field by a so-called *full field path*,
/// specified before the ":". (e.g. identity.username:fulmicoton).
///
/// The way we split the full field path into (field_name, field_path) can be ambiguous,
/// because field_names can contain "." themselves.
// For instance if a field is named `one.two` and another one is named `one`,
/// should `one.two:three` target `one.two` with field path `` or or `one` with
/// the field path `two`.
///
/// In this case tantivy, just picks the solution with the longest field name.
///
/// Quirk: As a hack for quickwit, we do not split over a dot that appear escaped '\.'.
fn compute_path_triplets_for_literal<'a>(
&self,
literal: &'a UserInputLiteral,
) -> Result<Vec<(Field, &'a str, &'a str)>, QueryParserError> {
match &literal.field_name {
Some(ref full_path) => {
// We need to add terms associated to json default fields.
let (field_name, path) = self.split_full_path(full_path);
if let Ok(field) = self.resolve_field_name(field_name) {
return Ok(vec![(field, path, literal.phrase.as_str())]);
}
let triplets: Vec<(Field, &str, &str)> = self
.default_indexed_json_fields()
.map(|json_field| (json_field, full_path.as_str(), literal.phrase.as_str()))
.collect();
if triplets.is_empty() {
return Err(QueryParserError::FieldDoesNotExist(full_path.to_string()));
}
Ok(triplets)
}
None => {
if self.default_fields.is_empty() {
return Err(QueryParserError::NoDefaultFieldDeclared);
}
Ok(self
.default_fields
.iter()
.map(|default_field| (*default_field, "", literal.phrase.as_str()))
.collect::<Vec<(Field, &str, &str)>>())
let full_path = if let Some(full_path) = &literal.field_name {
full_path
} else {
// The user did not specify any path...
// We simply target default fields.
if self.default_fields.is_empty() {
return Err(QueryParserError::NoDefaultFieldDeclared);
}
return Ok(self
.default_fields
.iter()
.map(|default_field| (*default_field, "", literal.phrase.as_str()))
.collect::<Vec<(Field, &str, &str)>>());
};
if let Some((field, path)) = self.split_full_path(full_path) {
return Ok(vec![(field, path, literal.phrase.as_str())]);
}
// We need to add terms associated to json default fields.
let triplets: Vec<(Field, &str, &str)> = self
.default_indexed_json_fields()
.map(|json_field| (json_field, full_path.as_str(), literal.phrase.as_str()))
.collect();
if triplets.is_empty() {
return Err(QueryParserError::FieldDoesNotExist(full_path.to_string()));
}
Ok(triplets)
}
fn compute_logical_ast_from_leaf(
@@ -571,7 +610,7 @@ impl QueryParser {
match leaf {
UserInputLeaf::Literal(literal) => {
let term_phrases: Vec<(Field, &str, &str)> =
self.compute_path_triplet_for_literal(&literal)?;
self.compute_path_triplets_for_literal(&literal)?;
let mut asts: Vec<LogicalAst> = Vec::new();
for (field, json_path, phrase) in term_phrases {
for ast in self.compute_logical_ast_for_leaf(field, json_path, phrase)? {
@@ -598,8 +637,9 @@ impl QueryParser {
"Range query need to target a specific field.".to_string(),
)
})?;
let (field_name, json_path) = self.split_full_path(&full_path);
let field = self.resolve_field_name(field_name)?;
let (field, json_path) = self
.split_full_path(&full_path)
.ok_or_else(|| QueryParserError::FieldDoesNotExist(full_path.clone()))?;
let field_entry = self.schema.get_field_entry(field);
let value_type = field_entry.field_type().value_type();
let logical_ast = LogicalAst::Leaf(Box::new(LogicalLiteral::Range {
@@ -660,30 +700,6 @@ fn generate_literals_for_str(
Ok(Some(LogicalLiteral::Phrase(terms)))
}
enum NumValue {
U64(u64),
I64(i64),
F64(f64),
DateTime(OffsetDateTime),
}
fn infer_type_num(phrase: &str) -> Option<NumValue> {
if let Ok(dt) = OffsetDateTime::parse(phrase, &Rfc3339) {
let dt_utc = dt.to_offset(UtcOffset::UTC);
return Some(NumValue::DateTime(dt_utc));
}
if let Ok(u64_val) = str::parse::<u64>(phrase) {
return Some(NumValue::U64(u64_val));
}
if let Ok(i64_val) = str::parse::<i64>(phrase) {
return Some(NumValue::I64(i64_val));
}
if let Ok(f64_val) = str::parse::<f64>(phrase) {
return Some(NumValue::F64(f64_val));
}
None
}
fn generate_literals_for_json_object(
field_name: &str,
field: Field,
@@ -694,38 +710,13 @@ fn generate_literals_for_json_object(
) -> Result<Vec<LogicalLiteral>, QueryParserError> {
let mut logical_literals = Vec::new();
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term);
for segment in json_path.split('.') {
json_term_writer.push_path_segment(segment);
let mut json_term_writer =
JsonTermWriter::from_field_and_json_path(field, json_path, &mut term);
if let Some(term) = convert_to_fast_value_and_get_term(&mut json_term_writer, phrase) {
logical_literals.push(LogicalLiteral::Term(term));
}
if let Some(num_value) = infer_type_num(phrase) {
match num_value {
NumValue::U64(u64_val) => {
json_term_writer.set_fast_value(u64_val);
}
NumValue::I64(i64_val) => {
json_term_writer.set_fast_value(i64_val);
}
NumValue::F64(f64_val) => {
json_term_writer.set_fast_value(f64_val);
}
NumValue::DateTime(dt_val) => {
json_term_writer.set_fast_value(DateTime::from_utc(dt_val));
}
}
logical_literals.push(LogicalLiteral::Term(json_term_writer.term().clone()));
}
json_term_writer.close_path_and_set_type(Type::Str);
let terms = set_string_and_get_terms(&mut json_term_writer, phrase, text_analyzer);
drop(json_term_writer);
let term_num_bytes = term.value_bytes().len();
let mut token_stream = text_analyzer.token_stream(phrase);
let mut terms: Vec<(usize, Term)> = Vec::new();
token_stream.process(&mut |token| {
term.truncate(term_num_bytes);
term.append_bytes(token.text.as_bytes());
terms.push((token.position, term.clone()));
});
if terms.len() <= 1 {
for (_, term) in terms {
logical_literals.push(LogicalLiteral::Term(term));
@@ -1399,29 +1390,56 @@ mod test {
}
}
#[test]
fn test_escaped_field() {
let mut schema_builder = Schema::builder();
schema_builder.add_text_field(r#"a\.b"#, STRING);
let schema = schema_builder.build();
let query_parser = QueryParser::new(schema, Vec::new(), TokenizerManager::default());
let query = query_parser.parse_query(r#"a\.b:hello"#).unwrap();
assert_eq!(
format!("{:?}", query),
"TermQuery(Term(type=Str, field=0, \"hello\"))"
);
}
#[test]
fn test_split_full_path() {
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("second", STRING);
schema_builder.add_text_field("first", STRING);
schema_builder.add_text_field("first.toto", STRING);
schema_builder.add_text_field("first.toto.titi", STRING);
schema_builder.add_text_field("third.a.b.c", STRING);
let schema = schema_builder.build();
let query_parser = QueryParser::new(schema, Vec::new(), TokenizerManager::default());
let query_parser =
QueryParser::new(schema.clone(), Vec::new(), TokenizerManager::default());
assert_eq!(
query_parser.split_full_path("first.toto"),
("first.toto", "")
Some((schema.get_field("first.toto").unwrap(), ""))
);
assert_eq!(
query_parser.split_full_path("first.toto.bubu"),
Some((schema.get_field("first.toto").unwrap(), "bubu"))
);
assert_eq!(
query_parser.split_full_path("first.toto.titi"),
Some((schema.get_field("first.toto.titi").unwrap(), ""))
);
assert_eq!(
query_parser.split_full_path("first.titi"),
("first", "titi")
Some((schema.get_field("first").unwrap(), "titi"))
);
assert_eq!(query_parser.split_full_path("third"), ("", "third"));
assert_eq!(
query_parser.split_full_path("hello.toto"),
("", "hello.toto")
);
assert_eq!(query_parser.split_full_path(""), ("", ""));
assert_eq!(query_parser.split_full_path("firsty"), ("", "firsty"));
assert_eq!(query_parser.split_full_path("third"), None);
assert_eq!(query_parser.split_full_path("hello.toto"), None);
assert_eq!(query_parser.split_full_path(""), None);
assert_eq!(query_parser.split_full_path("firsty"), None);
}
#[test]
fn test_locate_splitting_dots() {
assert_eq!(&super::locate_splitting_dots("a.b.c"), &[1, 3]);
assert_eq!(&super::locate_splitting_dots(r#"a\.b.c"#), &[4]);
assert_eq!(&super::locate_splitting_dots(r#"a\..b.c"#), &[3, 5]);
}
}

View File

@@ -2,7 +2,7 @@ use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crossbeam::channel::{unbounded, Receiver, RecvError, Sender};
use crossbeam_channel::{unbounded, Receiver, RecvError, Sender};
pub struct GenerationItem<T> {
generation: usize,
@@ -197,7 +197,7 @@ mod tests {
use std::{iter, mem};
use crossbeam::channel;
use crossbeam_channel as channel;
use super::{Pool, Queue};

View File

@@ -147,7 +147,7 @@ impl WarmingStateInner {
/// Every [GC_INTERVAL] attempt to GC, with panics caught and logged using
/// [std::panic::catch_unwind].
fn gc_loop(inner: Weak<Mutex<WarmingStateInner>>) {
for _ in crossbeam::channel::tick(GC_INTERVAL) {
for _ in crossbeam_channel::tick(GC_INTERVAL) {
if let Some(inner) = inner.upgrade() {
// rely on deterministic gc in tests
#[cfg(not(test))]

View File

@@ -213,6 +213,8 @@ impl BinarySerializable for Document {
#[cfg(test)]
mod tests {
use common::BinarySerializable;
use crate::schema::*;
#[test]
@@ -223,4 +225,22 @@ mod tests {
doc.add_text(text_field, "My title");
assert_eq!(doc.field_values().len(), 1);
}
#[test]
fn test_doc_serialization_issue() {
let mut doc = Document::default();
doc.add_json_object(
Field::from_field_id(0),
serde_json::json!({"key": 2u64})
.as_object()
.unwrap()
.clone(),
);
doc.add_text(Field::from_field_id(1), "hello");
assert_eq!(doc.field_values().len(), 2);
let mut payload: Vec<u8> = Vec::new();
doc.serialize(&mut payload).unwrap();
assert_eq!(payload.len(), 26);
Document::deserialize(&mut &payload[..]).unwrap();
}
}

View File

@@ -17,7 +17,7 @@ use crate::DateTime;
///
/// - <value> is, if this is not the json term, a binary representation specific to the type.
/// If it is a JSON Term, then it is preprended with the path that leads to this leaf value.
const FAST_VALUE_TERM_LEN: usize = 8;
const FAST_VALUE_TERM_LEN: usize = 4 + 1 + 8;
/// Separates the different segments of
/// the json path.
@@ -33,33 +33,22 @@ pub const JSON_END_OF_PATH: u8 = 0u8;
///
/// It actually wraps a `Vec<u8>`.
#[derive(Clone)]
pub struct Term<B = Vec<u8>> {
data: B,
field: Field,
field_type: Type,
}
pub struct Term<B = Vec<u8>>(B)
where B: AsRef<[u8]>;
impl AsMut<Vec<u8>> for Term {
fn as_mut(&mut self) -> &mut Vec<u8> {
&mut self.data
&mut self.0
}
}
impl Term {
pub(crate) fn new() -> Term {
Self::with_capacity(32)
}
pub(crate) fn with_capacity(cap: usize) -> Term {
Term {
data: Vec::with_capacity(cap),
field: Field::from_field_id(0),
field_type: Type::Str,
}
Term(Vec::with_capacity(100))
}
fn from_fast_value<T: FastValue>(field: Field, val: &T) -> Term {
let mut term = Term::with_capacity(FAST_VALUE_TERM_LEN);
let mut term = Term(vec![0u8; FAST_VALUE_TERM_LEN]);
term.set_field(T::to_type(), field);
term.set_u64(val.to_u64());
term
@@ -97,9 +86,9 @@ impl Term {
}
fn create_bytes_term(typ: Type, field: Field, bytes: &[u8]) -> Term {
let mut term = Term::with_capacity(bytes.len());
let mut term = Term(vec![0u8; 5 + bytes.len()]);
term.set_field(typ, field);
term.data.extend_from_slice(bytes);
term.0.extend_from_slice(bytes);
term
}
@@ -109,9 +98,10 @@ impl Term {
}
pub(crate) fn set_field(&mut self, typ: Type, field: Field) {
self.field = field;
self.field_type = typ;
self.data.clear();
self.0.clear();
self.0
.extend_from_slice(field.field_id().to_be_bytes().as_ref());
self.0.push(typ.to_code());
}
/// Sets a u64 value in the term.
@@ -122,9 +112,11 @@ impl Term {
/// the natural order of the values.
pub fn set_u64(&mut self, val: u64) {
self.set_fast_value(val);
self.set_bytes(val.to_be_bytes().as_ref());
}
fn set_fast_value<T: FastValue>(&mut self, val: T) {
self.0.resize(FAST_VALUE_TERM_LEN, 0u8);
self.set_bytes(val.to_u64().to_be_bytes().as_ref());
}
@@ -145,8 +137,8 @@ impl Term {
/// Sets the value of a `Bytes` field.
pub fn set_bytes(&mut self, bytes: &[u8]) {
self.data.clear();
self.data.extend(bytes);
self.0.resize(5, 0u8);
self.0.extend(bytes);
}
/// Set the texts only, keeping the field untouched.
@@ -156,18 +148,18 @@ impl Term {
/// Removes the value_bytes and set the type code.
pub fn clear_with_type(&mut self, typ: Type) {
self.data.clear();
self.field_type = typ;
self.truncate(5);
self.0[4] = typ.to_code();
}
/// Truncate the term right after the field and the type code.
pub fn truncate(&mut self, len: usize) {
self.data.truncate(len);
self.0.truncate(len);
}
/// Truncate the term right after the field and the type code.
pub fn append_bytes(&mut self, bytes: &[u8]) {
self.data.extend_from_slice(bytes);
self.0.extend_from_slice(bytes);
}
}
@@ -175,7 +167,7 @@ impl<B> Ord for Term<B>
where B: AsRef<[u8]>
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.value_bytes().cmp(other.value_bytes())
self.as_slice().cmp(other.as_slice())
}
}
@@ -191,7 +183,7 @@ impl<B> PartialEq for Term<B>
where B: AsRef<[u8]>
{
fn eq(&self, other: &Self) -> bool {
self.value_bytes() == other.value_bytes()
self.as_slice() == other.as_slice()
}
}
@@ -201,7 +193,7 @@ impl<B> Hash for Term<B>
where B: AsRef<[u8]>
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.data.as_ref().hash(state)
self.0.as_ref().hash(state)
}
}
@@ -210,15 +202,14 @@ where B: AsRef<[u8]>
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> Term<B> {
Term {
data,
field: Field::from_field_id(0),
field_type: Type::Str,
}
Term(data)
}
fn typ_code(&self) -> u8 {
self.field_type as u8
*self
.as_slice()
.get(4)
.expect("the byte representation is too short")
}
/// Return the type of the term.
@@ -228,7 +219,55 @@ where B: AsRef<[u8]>
/// Returns the field.
pub fn field(&self) -> Field {
self.field
let mut field_id_bytes = [0u8; 4];
field_id_bytes.copy_from_slice(&self.0.as_ref()[..4]);
Field::from_field_id(u32::from_be_bytes(field_id_bytes))
}
/// Returns the `u64` value stored in a term.
///
/// Returns None if the term is not of the u64 type, or if the term byte representation
/// is invalid.
pub fn as_u64(&self) -> Option<u64> {
self.get_fast_type::<u64>()
}
fn get_fast_type<T: FastValue>(&self) -> Option<T> {
if self.typ() != T::to_type() {
return None;
}
let mut value_bytes = [0u8; 8];
let bytes = self.value_bytes();
if bytes.len() != 8 {
return None;
}
value_bytes.copy_from_slice(self.value_bytes());
let value_u64 = u64::from_be_bytes(value_bytes);
Some(FastValue::from_u64(value_u64))
}
/// Returns the `i64` value stored in a term.
///
/// Returns None if the term is not of the i64 type, or if the term byte representation
/// is invalid.
pub fn as_i64(&self) -> Option<i64> {
self.get_fast_type::<i64>()
}
/// Returns the `f64` value stored in a term.
///
/// Returns None if the term is not of the f64 type, or if the term byte representation
/// is invalid.
pub fn as_f64(&self) -> Option<f64> {
self.get_fast_type::<f64>()
}
/// Returns the `Date` value stored in a term.
///
/// Returns None if the term is not of the Date type, or if the term byte representation
/// is invalid.
pub fn as_date(&self) -> Option<DateTime> {
self.get_fast_type::<DateTime>()
}
/// Returns the text associated with the term.
@@ -236,12 +275,43 @@ where B: AsRef<[u8]>
/// Returns None if the field is not of string type
/// or if the bytes are not valid utf-8.
pub fn as_str(&self) -> Option<&str> {
if self.as_slice().len() < 5 {
return None;
}
if self.typ() != Type::Str {
return None;
}
str::from_utf8(self.value_bytes()).ok()
}
/// Returns the facet associated with the term.
///
/// Returns None if the field is not of facet type
/// or if the bytes are not valid utf-8.
pub fn as_facet(&self) -> Option<Facet> {
if self.as_slice().len() < 5 {
return None;
}
if self.typ() != Type::Facet {
return None;
}
let facet_encode_str = str::from_utf8(self.value_bytes()).ok()?;
Some(Facet::from_encoded_string(facet_encode_str.to_string()))
}
/// Returns the bytes associated with the term.
///
/// Returns None if the field is not of bytes type.
pub fn as_bytes(&self) -> Option<&[u8]> {
if self.as_slice().len() < 5 {
return None;
}
if self.typ() != Type::Bytes {
return None;
}
Some(self.value_bytes())
}
/// Returns the serialized value of the term.
/// (this does not include the field.)
///
@@ -249,7 +319,15 @@ where B: AsRef<[u8]>
/// If the term is a u64, its value is encoded according
/// to `byteorder::LittleEndian`.
pub fn value_bytes(&self) -> &[u8] {
&self.data.as_ref()
&self.0.as_ref()[5..]
}
/// Returns the underlying `&[u8]`.
///
/// Do NOT rely on this byte representation in the index.
/// This value is likely to change in the future.
pub(crate) fn as_slice(&self) -> &[u8] {
self.0.as_ref()
}
}
@@ -356,6 +434,7 @@ mod tests {
let term = Term::from_field_u64(count_field, 983u64);
assert_eq!(term.field(), count_field);
assert_eq!(term.typ(), Type::U64);
assert_eq!(term.value_bytes().len(), super::FAST_VALUE_TERM_LEN);
assert_eq!(term.as_slice().len(), super::FAST_VALUE_TERM_LEN);
assert_eq!(term.as_u64(), Some(983u64))
}
}

View File

@@ -42,6 +42,11 @@ impl TextOptions {
/// Text fast fields will have the term ids stored in the fast field.
/// The fast field will be a multivalued fast field.
///
/// The effective cardinality depends on the tokenizer. When creating fast fields on text
/// fields it is recommended to use the "raw" tokenizer, since it will store the original text
/// unchanged. The "default" tokenizer will store the terms as lower case and this will be
/// reflected in the dictionary.
///
/// The original text can be retrieved via `ord_to_term` from the dictionary.
#[must_use]
pub fn set_fast(mut self) -> TextOptions {

View File

@@ -388,8 +388,16 @@ mod binary_serialize {
}
}
JSON_OBJ_CODE => {
let map = serde_json::from_reader(reader)?;
Ok(Value::JsonObject(map))
// As explained in
// https://docs.serde.rs/serde_json/fn.from_reader.html
//
// `T::from_reader(..)` expects EOF after reading the object,
// which is not what we want here.
//
// For this reason we need to create our own `Deserializer`.
let mut de = serde_json::Deserializer::from_reader(reader);
let json_map = <serde_json::Map::<String, serde_json::Value> as serde::Deserialize>::deserialize(&mut de)?;
Ok(Value::JsonObject(json_map))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,

View File

@@ -0,0 +1,50 @@
use std::io;
use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
use zstd::DEFAULT_COMPRESSION_LEVEL;
#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;
compressed.clear();
compressed.resize(max_size, 0);
let compressed_size = compress_to_buffer(
uncompressed,
&mut compressed[count_size..],
DEFAULT_COMPRESSION_LEVEL,
)?;
compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
compressed.resize(compressed_size + count_size, 0);
Ok(())
}
#[inline]
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let uncompressed_size = u32::from_le_bytes(
compressed
.get(..count_size)
.ok_or(io::ErrorKind::InvalidData)?
.try_into()
.unwrap(),
) as usize;
decompressed.clear();
decompressed.resize(uncompressed_size, 0);
let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?;
if decompressed_size != uncompressed_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"doc store block not completely decompressed, data corruption".to_string(),
));
}
Ok(())
}

View File

@@ -26,6 +26,9 @@ pub enum Compressor {
#[serde(rename = "snappy")]
/// Use the snap compressor
Snappy,
#[serde(rename = "zstd")]
/// Use the zstd compressor
Zstd,
}
impl Default for Compressor {
@@ -36,6 +39,8 @@ impl Default for Compressor {
Compressor::Brotli
} else if cfg!(feature = "snappy-compression") {
Compressor::Snappy
} else if cfg!(feature = "zstd-compression") {
Compressor::Zstd
} else {
Compressor::None
}
@@ -49,6 +54,7 @@ impl Compressor {
1 => Compressor::Lz4,
2 => Compressor::Brotli,
3 => Compressor::Snappy,
4 => Compressor::Zstd,
_ => panic!("unknown compressor id {:?}", id),
}
}
@@ -58,6 +64,7 @@ impl Compressor {
Self::Lz4 => 1,
Self::Brotli => 2,
Self::Snappy => 3,
Self::Zstd => 4,
}
}
#[inline]
@@ -98,6 +105,16 @@ impl Compressor {
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::compress(uncompressed, compressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}
@@ -143,6 +160,16 @@ impl Compressor {
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::decompress(compressed, decompressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}
}

View File

@@ -50,6 +50,9 @@ mod compression_brotli;
#[cfg(feature = "snappy-compression")]
mod compression_snap;
#[cfg(feature = "zstd-compression")]
mod compression_zstd_block;
#[cfg(test)]
pub mod tests {
@@ -69,10 +72,13 @@ pub mod tests {
sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \
mollit anim id est laborum.";
const BLOCK_SIZE: usize = 16_384;
pub fn write_lorem_ipsum_store(
writer: WritePtr,
num_docs: usize,
compressor: Compressor,
blocksize: usize,
) -> Schema {
let mut schema_builder = Schema::builder();
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
@@ -80,7 +86,7 @@ pub mod tests {
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
@@ -103,7 +109,7 @@ pub mod tests {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4);
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
@@ -139,11 +145,11 @@ pub mod tests {
Ok(())
}
fn test_store(compressor: Compressor) -> crate::Result<()> {
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor);
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
@@ -169,22 +175,28 @@ pub mod tests {
#[test]
fn test_store_noop() -> crate::Result<()> {
test_store(Compressor::None)
test_store(Compressor::None, BLOCK_SIZE)
}
#[cfg(feature = "lz4-compression")]
#[test]
fn test_store_lz4_block() -> crate::Result<()> {
test_store(Compressor::Lz4)
test_store(Compressor::Lz4, BLOCK_SIZE)
}
#[cfg(feature = "snappy-compression")]
#[test]
fn test_store_snap() -> crate::Result<()> {
test_store(Compressor::Snappy)
test_store(Compressor::Snappy, BLOCK_SIZE)
}
#[cfg(feature = "brotli-compression")]
#[test]
fn test_store_brotli() -> crate::Result<()> {
test_store(Compressor::Brotli)
test_store(Compressor::Brotli, BLOCK_SIZE)
}
#[cfg(feature = "zstd-compression")]
#[test]
fn test_store_zstd() -> crate::Result<()> {
test_store(Compressor::Zstd, BLOCK_SIZE)
}
#[test]
@@ -348,6 +360,7 @@ mod bench {
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
16_384,
);
directory.delete(path).unwrap();
});
@@ -361,6 +374,7 @@ mod bench {
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
16_384,
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file).unwrap();

View File

@@ -304,6 +304,8 @@ mod tests {
use crate::store::tests::write_lorem_ipsum_store;
use crate::Directory;
const BLOCK_SIZE: usize = 16_384;
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.as_text())
}
@@ -313,7 +315,7 @@ mod tests {
let directory = RamDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default());
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;

View File

@@ -11,8 +11,6 @@ use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId;
const BLOCK_SIZE: usize = 16_384;
/// Write tantivy's [`Store`](./index.html)
///
/// Contrary to the other components of `tantivy`,
@@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384;
/// The skip list index on the other hand, is built in memory.
pub struct StoreWriter {
compressor: Compressor,
block_size: usize,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
@@ -35,9 +34,10 @@ impl StoreWriter {
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter {
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
StoreWriter {
compressor,
block_size,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
@@ -65,7 +65,7 @@ impl StoreWriter {
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
Ok(())
@@ -86,7 +86,7 @@ impl StoreWriter {
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
Ok(())

View File

@@ -28,7 +28,6 @@ use fst_termdict as termdict;
mod sstable_termdict;
#[cfg(feature = "quickwit")]
use sstable_termdict as termdict;
use tantivy_fst::automaton::AlwaysMatch;
#[cfg(test)]
mod tests;
@@ -36,24 +35,4 @@ mod tests;
/// Position of the term in the sorted list of terms.
pub type TermOrdinal = u64;
/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
pub type TermDictionary = self::termdict::TermDictionary;
/// Builder for the new term dictionary.
///
/// Inserting must be done in the order of the `keys`.
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub type TermMerger<'a> = self::termdict::TermMerger<'a>;
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermMerger, TermStreamer};

View File

@@ -145,6 +145,12 @@ where
}
pub fn write_key(&mut self, key: &[u8]) {
// If this is the first key in the block, we use it to
// shorten the last term in the last block.
if self.first_ordinal_of_the_block == self.num_terms {
self.index_builder
.shorten_last_block_key_given_next_key(key);
}
let keep_len = common_prefix_len(&self.previous_key, key);
let add_len = key.len() - keep_len;
let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
@@ -273,11 +279,12 @@ mod test {
33u8, 18u8, 19u8, // keep 1 push 1 | 20
17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
// index
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 104, 108, 97, 115, 116, 95, 107,
101, 121, 130, 17, 20, 106, 98, 108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106,
98, 121, 116, 101, 95, 114, 97, 110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0,
99, 101, 110, 100, 11, 109, 102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110,
97, 108, 0, 15, 0, 0, 0, 0, 0, 0, 0, // offset for the index
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 115, 108, 97, 115, 116, 95, 107,
101, 121, 95, 111, 114, 95, 103, 114, 101, 97, 116, 101, 114, 130, 17, 20, 106, 98,
108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106, 98, 121, 116, 101, 95, 114, 97,
110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0, 99, 101, 110, 100, 11, 109,
102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0,
0, 0, 0, // offset for the index
3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
]
);

View File

@@ -4,6 +4,7 @@ use std::ops::Range;
use serde::{Deserialize, Serialize};
use crate::error::DataCorruption;
use crate::termdict::sstable_termdict::sstable::common_prefix_len;
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct SSTableIndex {
@@ -19,7 +20,7 @@ impl SSTableIndex {
pub fn search(&self, key: &[u8]) -> Option<BlockAddr> {
self.blocks
.iter()
.find(|block| &block.last_key[..] >= key)
.find(|block| &block.last_key_or_greater[..] >= key)
.map(|block| block.block_addr.clone())
}
}
@@ -32,7 +33,10 @@ pub struct BlockAddr {
#[derive(Debug, Serialize, Deserialize)]
struct BlockMeta {
pub last_key: Vec<u8>,
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
/// and yet stricly smaller than the first key in the next block.
pub last_key_or_greater: Vec<u8>,
pub block_addr: BlockAddr,
}
@@ -41,10 +45,39 @@ pub struct SSTableIndexBuilder {
index: SSTableIndex,
}
/// Given that left < right,
/// mutates `left into a shorter byte string left'` that
/// matches `left <= left' < right`.
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
assert!(&left[..] < right);
let common_len = common_prefix_len(&left, right);
if left.len() == common_len {
return;
}
// It is possible to do one character shorter in some case,
// but it is not worth the extra complexity
for pos in (common_len + 1)..left.len() {
if left[pos] != u8::MAX {
left[pos] += 1;
left.truncate(pos + 1);
return;
}
}
}
impl SSTableIndexBuilder {
/// In order to make the index as light as possible, we
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.index.blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.index.blocks.push(BlockMeta {
last_key: last_key.to_vec(),
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
first_ordinal,
@@ -97,4 +130,35 @@ mod tests {
"Data corruption: SSTable index is corrupted."
);
}
#[track_caller]
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
let mut left_buf = left.to_vec();
super::find_shorter_str_in_between(&mut left_buf, right);
assert!(left_buf.len() <= left.len());
assert!(left <= &left_buf);
assert!(&left_buf[..] < &right);
}
#[test]
fn test_find_shorter_str_in_between() {
test_find_shorter_str_in_between_aux(b"", b"hello");
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
if left < right {
test_find_shorter_str_in_between_aux(&left, &right);
}
}
}
}

View File

@@ -25,6 +25,13 @@ pub struct TokenizerManager {
}
impl TokenizerManager {
/// Creates an empty tokenizer manager.
pub fn new() -> Self {
Self {
tokenizers: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Registers a new tokenizer associated with a given name.
pub fn register<T>(&self, tokenizer_name: &str, tokenizer: T)
where TextAnalyzer: From<T> {
@@ -52,9 +59,7 @@ impl Default for TokenizerManager {
/// - en_stem
/// - ja
fn default() -> TokenizerManager {
let manager = TokenizerManager {
tokenizers: Arc::new(RwLock::new(HashMap::new())),
};
let manager = TokenizerManager::new();
manager.register("raw", RawTokenizer);
manager.register(
"default",