mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-12 20:12:54 +00:00
Compare commits
47 Commits
termmap_pe
...
0.18
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0a2b1cc44 | ||
|
|
fcfdc44c61 | ||
|
|
3171f0b9ba | ||
|
|
89e19f14b5 | ||
|
|
1a6a1396cd | ||
|
|
e766375700 | ||
|
|
496b4a4fdb | ||
|
|
93cc8498b3 | ||
|
|
0aa3d63a9f | ||
|
|
4e2a053b69 | ||
|
|
71c4393ec4 | ||
|
|
b2e97e266a | ||
|
|
9ee4772140 | ||
|
|
c95013b11e | ||
|
|
fc045e6bf9 | ||
|
|
6837a4d468 | ||
|
|
0759bf9448 | ||
|
|
152e8238d7 | ||
|
|
d4e5b48437 | ||
|
|
03040ed81d | ||
|
|
aaa22ad225 | ||
|
|
3223bdf254 | ||
|
|
cbd06ab189 | ||
|
|
749395bbb8 | ||
|
|
617ba1f0c0 | ||
|
|
2f1cd7e7f0 | ||
|
|
58c0cb5fc4 | ||
|
|
7f45a6ac96 | ||
|
|
0ade871126 | ||
|
|
aab65490c9 | ||
|
|
d77e8de36a | ||
|
|
d11a8cce26 | ||
|
|
bc607a921b | ||
|
|
1273f33338 | ||
|
|
e30449743c | ||
|
|
ed26552296 | ||
|
|
65d129afbd | ||
|
|
386ffab76c | ||
|
|
57a8d0359c | ||
|
|
14cb66ee00 | ||
|
|
9e38343352 | ||
|
|
944302ae2f | ||
|
|
be70804d17 | ||
|
|
a1afc80600 | ||
|
|
02e24fda52 | ||
|
|
4db655ae82 | ||
|
|
bb44cc84c4 |
2
.github/workflows/test.yml
vendored
2
.github/workflows/test.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
||||
components: rustfmt, clippy
|
||||
|
||||
- name: Run tests
|
||||
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
|
||||
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace
|
||||
|
||||
- name: Run tests quickwit feature
|
||||
run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
Unreleased
|
||||
Tantivy 0.18
|
||||
================================
|
||||
- For date values `chrono` has been replaced with `time` (@uklotzde) #1304 :
|
||||
- The `time` crate is re-exported as `tantivy::time` instead of `tantivy::chrono`.
|
||||
@@ -11,6 +11,7 @@ Unreleased
|
||||
- Add [histogram](https://github.com/quickwit-oss/tantivy/pull/1306) aggregation (@PSeitz)
|
||||
- Add support for fastfield on text fields (@PSeitz)
|
||||
- Add terms aggregation (@PSeitz)
|
||||
- Add support for zstd compression (@kryesh)
|
||||
|
||||
Tantivy 0.17
|
||||
================================
|
||||
|
||||
96
Cargo.toml
96
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -10,71 +10,72 @@ homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
oneshot = "0.1"
|
||||
base64 = "0.13"
|
||||
oneshot = "0.1.3"
|
||||
base64 = "0.13.0"
|
||||
byteorder = "1.4.3"
|
||||
crc32fast = "1.2.1"
|
||||
once_cell = "1.7.2"
|
||||
regex ={ version = "1.5.4", default-features = false, features = ["std"] }
|
||||
tantivy-fst = "0.3"
|
||||
memmap2 = {version = "0.5", optional=true}
|
||||
lz4_flex = { version = "0.9", default-features = false, features = ["checked-decode"], optional = true }
|
||||
brotli = { version = "3.3", optional = true }
|
||||
crc32fast = "1.3.2"
|
||||
once_cell = "1.10.0"
|
||||
regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] }
|
||||
tantivy-fst = "0.3.0"
|
||||
memmap2 = { version = "0.5.3", optional = true }
|
||||
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
|
||||
brotli = { version = "3.3.4", optional = true }
|
||||
zstd = { version = "0.11", optional = true }
|
||||
snap = { version = "1.0.5", optional = true }
|
||||
tempfile = { version = "3.2", optional = true }
|
||||
log = "0.4.14"
|
||||
serde = { version = "1.0.126", features = ["derive"] }
|
||||
serde_json = "1.0.64"
|
||||
num_cpus = "1.13"
|
||||
tempfile = { version = "3.3.0", optional = true }
|
||||
log = "0.4.16"
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
serde_json = "1.0.79"
|
||||
num_cpus = "1.13.1"
|
||||
fs2={ version = "0.4.3", optional = true }
|
||||
levenshtein_automata = "0.2"
|
||||
levenshtein_automata = "0.2.1"
|
||||
uuid = { version = "1.0.0", features = ["v4", "serde"] }
|
||||
crossbeam = "0.8.1"
|
||||
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
|
||||
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
|
||||
common = { version = "0.2", path = "./common/", package = "tantivy-common" }
|
||||
fastfield_codecs = { version="0.1", path="./fastfield_codecs", default-features = false }
|
||||
ownedbytes = { version="0.2", path="./ownedbytes" }
|
||||
stable_deref_trait = "1.2"
|
||||
rust-stemmers = "1.2"
|
||||
downcast-rs = "1.2"
|
||||
crossbeam-channel = "0.5.4"
|
||||
tantivy-query-grammar = { version="0.18.0", path="./query-grammar" }
|
||||
tantivy-bitpacker = { version="0.2", path="./bitpacker" }
|
||||
common = { version = "0.3", path = "./common/", package = "tantivy-common" }
|
||||
fastfield_codecs = { version="0.2", path="./fastfield_codecs", default-features = false }
|
||||
ownedbytes = { version="0.3", path="./ownedbytes" }
|
||||
stable_deref_trait = "1.2.0"
|
||||
rust-stemmers = "1.2.0"
|
||||
downcast-rs = "1.2.0"
|
||||
bitpacking = { version = "0.8.4", default-features = false, features = ["bitpacker4x"] }
|
||||
census = "0.4"
|
||||
census = "0.4.0"
|
||||
fnv = "1.0.7"
|
||||
thiserror = "1.0.24"
|
||||
thiserror = "1.0.30"
|
||||
htmlescape = "0.3.1"
|
||||
fail = "0.5"
|
||||
murmurhash32 = "0.2"
|
||||
time = { version = "0.3.7", features = ["serde-well-known"] }
|
||||
smallvec = "1.6.1"
|
||||
rayon = "1.5"
|
||||
lru = "0.7.0"
|
||||
fastdivide = "0.4"
|
||||
itertools = "0.10.0"
|
||||
measure_time = "0.8.0"
|
||||
pretty_assertions = "1.1.0"
|
||||
serde_cbor = {version="0.11", optional=true}
|
||||
async-trait = "0.1"
|
||||
fail = "0.5.0"
|
||||
murmurhash32 = "0.2.0"
|
||||
time = { version = "0.3.9", features = ["serde-well-known"] }
|
||||
smallvec = "1.8.0"
|
||||
rayon = "1.5.2"
|
||||
lru = "0.7.5"
|
||||
fastdivide = "0.4.0"
|
||||
itertools = "0.10.3"
|
||||
measure_time = "0.8.2"
|
||||
pretty_assertions = "1.2.1"
|
||||
serde_cbor = { version = "0.11.2", optional = true }
|
||||
async-trait = "0.1.53"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3.9"
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.8.3"
|
||||
rand = "0.8.5"
|
||||
maplit = "1.0.2"
|
||||
matches = "0.1.8"
|
||||
proptest = "1.0"
|
||||
matches = "0.1.9"
|
||||
proptest = "1.0.0"
|
||||
criterion = "0.3.5"
|
||||
test-log = "0.2.8"
|
||||
test-log = "0.2.10"
|
||||
env_logger = "0.9.0"
|
||||
pprof = {version= "0.8", features=["flamegraph", "criterion"]}
|
||||
futures = "0.3.15"
|
||||
pprof = { version = "0.9.0", features = ["flamegraph", "criterion"] }
|
||||
futures = "0.3.21"
|
||||
|
||||
[dev-dependencies.fail]
|
||||
version = "0.5"
|
||||
version = "0.5.0"
|
||||
features = ["failpoints"]
|
||||
|
||||
[profile.release]
|
||||
@@ -93,6 +94,7 @@ mmap = ["fs2", "tempfile", "memmap2"]
|
||||
brotli-compression = ["brotli"]
|
||||
lz4-compression = ["lz4_flex"]
|
||||
snappy-compression = ["snap"]
|
||||
zstd-compression = ["zstd"]
|
||||
|
||||
failpoints = ["fail/failpoints"]
|
||||
unstable = [] # useful for benches.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-bitpacker"
|
||||
version = "0.1.1"
|
||||
version = "0.2.0"
|
||||
edition = "2018"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-common"
|
||||
version = "0.2.0"
|
||||
version = "0.3.0"
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
license = "MIT"
|
||||
edition = "2018"
|
||||
@@ -10,7 +10,7 @@ description = "common traits and utility functions used by multiple tantivy subc
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.4.3"
|
||||
ownedbytes = { version="0.2", path="../ownedbytes" }
|
||||
ownedbytes = { version="0.3", path="../ownedbytes" }
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1.0.0"
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
// # Json field example
|
||||
//
|
||||
// This example shows how the json field can be used
|
||||
// to make tantivy partially schemaless.
|
||||
// to make tantivy partially schemaless by setting it as
|
||||
// default query parser field.
|
||||
|
||||
use tantivy::collector::{Count, TopDocs};
|
||||
use tantivy::query::QueryParser;
|
||||
@@ -10,10 +11,6 @@ use tantivy::Index;
|
||||
|
||||
fn main() -> tantivy::Result<()> {
|
||||
// # Defining the schema
|
||||
//
|
||||
// We need two fields:
|
||||
// - a timestamp
|
||||
// - a json object field
|
||||
let mut schema_builder = Schema::builder();
|
||||
schema_builder.add_date_field("timestamp", FAST | STORED);
|
||||
let event_type = schema_builder.add_text_field("event_type", STRING | STORED);
|
||||
@@ -43,7 +40,8 @@ fn main() -> tantivy::Result<()> {
|
||||
"attributes": {
|
||||
"target": "submit-button",
|
||||
"cart": {"product_id": 133},
|
||||
"description": "das keyboard"
|
||||
"description": "das keyboard",
|
||||
"event_type": "holiday-sale"
|
||||
}
|
||||
}"#,
|
||||
)?;
|
||||
@@ -53,6 +51,9 @@ fn main() -> tantivy::Result<()> {
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
// # Default fields: event_type and attributes
|
||||
// By setting attributes as a default field it allows omitting attributes itself, e.g. "target",
|
||||
// instead of "attributes.target"
|
||||
let query_parser = QueryParser::for_index(&index, vec![event_type, attributes]);
|
||||
{
|
||||
let query = query_parser.parse_query("target:submit-button")?;
|
||||
@@ -70,10 +71,34 @@ fn main() -> tantivy::Result<()> {
|
||||
assert_eq!(count_docs, 1);
|
||||
}
|
||||
{
|
||||
let query = query_parser
|
||||
.parse_query("event_type:click AND cart.product_id:133")
|
||||
.unwrap();
|
||||
let hits = searcher.search(&*query, &TopDocs::with_limit(2)).unwrap();
|
||||
let query = query_parser.parse_query("click AND cart.product_id:133")?;
|
||||
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(hits.len(), 1);
|
||||
}
|
||||
{
|
||||
// The sub-fields in the json field marked as default field still need to be explicitly
|
||||
// addressed
|
||||
let query = query_parser.parse_query("click AND 133")?;
|
||||
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(hits.len(), 0);
|
||||
}
|
||||
{
|
||||
// Default json fields are ignored if they collide with the schema
|
||||
let query = query_parser.parse_query("event_type:holiday-sale")?;
|
||||
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(hits.len(), 0);
|
||||
}
|
||||
// # Query via full attribute path
|
||||
{
|
||||
// This only searches in our schema's `event_type` field
|
||||
let query = query_parser.parse_query("event_type:click")?;
|
||||
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(hits.len(), 2);
|
||||
}
|
||||
{
|
||||
// Default json fields can still be accessed by full path
|
||||
let query = query_parser.parse_query("attributes.event_type:holiday-sale")?;
|
||||
let hits = searcher.search(&*query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(hits.len(), 1);
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "fastfield_codecs"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
authors = ["Pascal Seitz <pascal@quickwit.io>"]
|
||||
license = "MIT"
|
||||
edition = "2018"
|
||||
@@ -9,8 +9,8 @@ description = "Fast field codecs used by tantivy"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
common = { version = "0.2", path = "../common/", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version="0.1.1", path = "../bitpacker/" }
|
||||
common = { version = "0.3", path = "../common/", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version="0.2", path = "../bitpacker/" }
|
||||
prettytable-rs = {version="0.8.0", optional= true}
|
||||
rand = {version="0.8.3", optional= true}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
name = "ownedbytes"
|
||||
version = "0.2.0"
|
||||
version = "0.3.0"
|
||||
edition = "2018"
|
||||
description = "Expose data as static slice"
|
||||
license = "MIT"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-query-grammar"
|
||||
version = "0.15.0"
|
||||
version = "0.18.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::Occur;
|
||||
const SPECIAL_CHARS: &[char] = &[
|
||||
'+', '^', '`', ':', '{', '}', '"', '[', ']', '(', ')', '~', '!', '\\', '*', ' ',
|
||||
];
|
||||
const ESCAPED_SPECIAL_CHARS_PATTERN: &str = r#"\\(\+|\^|`|:|\{|\}|"|\[|\]|\(|\)|\~|!|\\|\*| )"#;
|
||||
const ESCAPED_SPECIAL_CHARS_PATTERN: &str = r#"\\(\+|\^|`|:|\{|\}|"|\[|\]|\(|\)|\~|!|\\|\*|\s)"#;
|
||||
|
||||
/// Parses a field_name
|
||||
/// A field name must have at least one character and be followed by a colon.
|
||||
@@ -34,7 +34,8 @@ fn field_name<'a>() -> impl Parser<&'a str, Output = String> {
|
||||
take_while(|c| !SPECIAL_CHARS.contains(&c)),
|
||||
),
|
||||
'\\',
|
||||
satisfy(|c| SPECIAL_CHARS.contains(&c)),
|
||||
satisfy(|_| true), /* if the next character is not a special char, the \ will be treated
|
||||
* as the \ character. */
|
||||
))
|
||||
.skip(char(':'))
|
||||
.map(|s| ESCAPED_SPECIAL_CHARS_RE.replace_all(&s, "$1").to_string())
|
||||
@@ -516,15 +517,27 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_field_name() -> TestParseResult {
|
||||
fn test_field_name() {
|
||||
assert_eq!(
|
||||
super::field_name().parse(".my.field.name:a"),
|
||||
Ok((".my.field.name".to_string(), "a"))
|
||||
);
|
||||
assert_eq!(
|
||||
super::field_name().parse(r#"my\ field:a"#),
|
||||
Ok(("my field".to_string(), "a"))
|
||||
);
|
||||
assert_eq!(
|
||||
super::field_name().parse(r#"にんじん:a"#),
|
||||
Ok(("にんじん".to_string(), "a"))
|
||||
);
|
||||
assert_eq!(
|
||||
super::field_name().parse("my\\ field\\ name:a"),
|
||||
Ok(("my field name".to_string(), "a"))
|
||||
);
|
||||
assert_eq!(
|
||||
super::field_name().parse(r#"my\field:a"#),
|
||||
Ok((r#"my\field"#.to_string(), "a"))
|
||||
);
|
||||
assert!(super::field_name().parse("my field:a").is_err());
|
||||
assert_eq!(
|
||||
super::field_name().parse("\\(1\\+1\\):2"),
|
||||
@@ -534,14 +547,21 @@ mod test {
|
||||
super::field_name().parse("my_field_name:a"),
|
||||
Ok(("my_field_name".to_string(), "a"))
|
||||
);
|
||||
assert_eq!(
|
||||
super::field_name().parse("myfield.b:hello").unwrap(),
|
||||
("myfield.b".to_string(), "hello")
|
||||
);
|
||||
assert_eq!(
|
||||
super::field_name().parse(r#"myfield\.b:hello"#).unwrap(),
|
||||
(r#"myfield\.b"#.to_string(), "hello")
|
||||
);
|
||||
assert!(super::field_name().parse("my_field_name").is_err());
|
||||
assert!(super::field_name().parse(":a").is_err());
|
||||
assert!(super::field_name().parse("-my_field:a").is_err());
|
||||
assert_eq!(
|
||||
super::field_name().parse("_my_field:a")?,
|
||||
("_my_field".to_string(), "a")
|
||||
super::field_name().parse("_my_field:a"),
|
||||
Ok(("_my_field".to_string(), "a"))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -230,8 +230,7 @@ pub enum BucketResult {
|
||||
impl BucketResult {
|
||||
pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result<Self> {
|
||||
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
|
||||
|
||||
Ok(BucketResult::from_intermediate_and_req(empty_bucket, req)?)
|
||||
BucketResult::from_intermediate_and_req(empty_bucket, req)
|
||||
}
|
||||
|
||||
fn from_intermediate_and_req(
|
||||
|
||||
@@ -1364,4 +1364,29 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn histogram_invalid_request() -> crate::Result<()> {
|
||||
let index = get_test_index_2_segments(true)?;
|
||||
|
||||
let agg_req: Aggregations = vec![(
|
||||
"histogram".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
|
||||
field: "score_f64".to_string(),
|
||||
interval: 0.0,
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let agg_res = exec_request(agg_req, &index);
|
||||
|
||||
assert!(agg_res.is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,7 +81,8 @@ pub struct TermsAggregation {
|
||||
///
|
||||
/// Should never be smaller than size.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub shard_size: Option<u32>,
|
||||
#[serde(alias = "shard_size")]
|
||||
pub split_size: Option<u32>,
|
||||
|
||||
/// The get more accurate results, we fetch more than `size` from each segment.
|
||||
///
|
||||
@@ -96,11 +97,11 @@ pub struct TermsAggregation {
|
||||
/// doc_count returned by each shard. It’s the sum of the size of the largest bucket on
|
||||
/// each segment that didn’t fit into `shard_size`.
|
||||
///
|
||||
/// Defaults to true when ordering by counts desc.
|
||||
/// Defaults to true when ordering by count desc.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub show_term_doc_count_error: Option<bool>,
|
||||
|
||||
/// Filter all terms than are lower `min_doc_count`. Defaults to 1.
|
||||
/// Filter all terms that are lower than `min_doc_count`. Defaults to 1.
|
||||
///
|
||||
/// **Expensive**: When set to 0, this will return all terms in the field.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
@@ -143,7 +144,7 @@ pub(crate) struct TermsAggregationInternal {
|
||||
/// Increasing this value is will increase the cost for more accuracy.
|
||||
pub segment_size: u32,
|
||||
|
||||
/// Filter all terms than are lower `min_doc_count`. Defaults to 1.
|
||||
/// Filter all terms that are lower than `min_doc_count`. Defaults to 1.
|
||||
///
|
||||
/// *Expensive*: When set to 0, this will return all terms in the field.
|
||||
pub min_doc_count: u64,
|
||||
@@ -572,7 +573,7 @@ mod tests {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "string_id".to_string(),
|
||||
size: Some(2),
|
||||
shard_size: Some(2),
|
||||
split_size: Some(2),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
@@ -1210,6 +1211,51 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(agg_req, agg_req_deser);
|
||||
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"term_agg_test":{
|
||||
"terms": {
|
||||
"field": "string_id",
|
||||
"split_size": 2u64,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// test alias shard_size, split_size
|
||||
let agg_req: Aggregations = vec![(
|
||||
"term_agg_test".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "string_id".to_string(),
|
||||
split_size: Some(2),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let agg_req_deser: Aggregations =
|
||||
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
|
||||
.unwrap();
|
||||
assert_eq!(agg_req, agg_req_deser);
|
||||
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"term_agg_test":{
|
||||
"terms": {
|
||||
"field": "string_id",
|
||||
"shard_size": 2u64,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let agg_req_deser: Aggregations =
|
||||
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
|
||||
.unwrap();
|
||||
assert_eq!(agg_req, agg_req_deser);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,9 @@ use crate::aggregation::bucket::TermsAggregationInternal;
|
||||
/// intermediate results.
|
||||
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct IntermediateAggregationResults {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) metrics: Option<VecWithNames<IntermediateMetricResult>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) buckets: Option<VecWithNames<IntermediateBucketResult>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,8 @@
|
||||
//!
|
||||
//! #### Limitations
|
||||
//!
|
||||
//! Currently aggregations work only on single value fast fields of type u64, f64 and i64.
|
||||
//! Currently aggregations work only on single value fast fields of type u64, f64, i64 and
|
||||
//! fast fields on text fields.
|
||||
//!
|
||||
//! # JSON Format
|
||||
//! Aggregations request and result structures de/serialize into elasticsearch compatible JSON.
|
||||
|
||||
@@ -92,7 +92,7 @@ mod histogram_collector;
|
||||
pub use histogram_collector::HistogramCollector;
|
||||
|
||||
mod multi_collector;
|
||||
pub use self::multi_collector::MultiCollector;
|
||||
pub use self::multi_collector::{FruitHandle, MultiCollector, MultiFruit};
|
||||
|
||||
mod top_collector;
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use super::{Collector, SegmentCollector};
|
||||
use crate::collector::Fruit;
|
||||
use crate::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
|
||||
|
||||
/// MultiFruit keeps Fruits from every nested Collector
|
||||
pub struct MultiFruit {
|
||||
sub_fruits: Vec<Option<Box<dyn Fruit>>>,
|
||||
}
|
||||
@@ -79,12 +80,17 @@ impl<TSegmentCollector: SegmentCollector> BoxableSegmentCollector
|
||||
}
|
||||
}
|
||||
|
||||
/// FruitHandle stores reference to the corresponding collector inside MultiCollector
|
||||
pub struct FruitHandle<TFruit: Fruit> {
|
||||
pos: usize,
|
||||
_phantom: PhantomData<TFruit>,
|
||||
}
|
||||
|
||||
impl<TFruit: Fruit> FruitHandle<TFruit> {
|
||||
/// Extract a typed fruit off a multifruit.
|
||||
///
|
||||
/// This function involves downcasting and can panic if the multifruit was
|
||||
/// created using faulty code.
|
||||
pub fn extract(self, fruits: &mut MultiFruit) -> TFruit {
|
||||
let boxed_fruit = fruits.sub_fruits[self.pos].take().expect("");
|
||||
*boxed_fruit
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crossbeam::channel;
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
|
||||
use crate::TantivyError;
|
||||
|
||||
/// Search executor whether search request are single thread or multithread.
|
||||
///
|
||||
/// We don't expose Rayon thread pool directly here for several reasons.
|
||||
@@ -47,16 +48,19 @@ impl Executor {
|
||||
match self {
|
||||
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
||||
Executor::ThreadPool(pool) => {
|
||||
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
|
||||
let num_fruits = args_with_indices.len();
|
||||
let args: Vec<A> = args.collect();
|
||||
let num_fruits = args.len();
|
||||
let fruit_receiver = {
|
||||
let (fruit_sender, fruit_receiver) = channel::unbounded();
|
||||
let (fruit_sender, fruit_receiver) = crossbeam_channel::unbounded();
|
||||
pool.scope(|scope| {
|
||||
for arg_with_idx in args_with_indices {
|
||||
scope.spawn(|_| {
|
||||
let (idx, arg) = arg_with_idx;
|
||||
let fruit = f(arg);
|
||||
if let Err(err) = fruit_sender.send((idx, fruit)) {
|
||||
for (idx, arg) in args.into_iter().enumerate() {
|
||||
// We name references for f and fruit_sender_ref because we do not
|
||||
// want these two to be moved into the closure.
|
||||
let f_ref = &f;
|
||||
let fruit_sender_ref = &fruit_sender;
|
||||
scope.spawn(move |_| {
|
||||
let fruit = f_ref(arg);
|
||||
if let Err(err) = fruit_sender_ref.send((idx, fruit)) {
|
||||
error!(
|
||||
"Failed to send search task. It probably means all search \
|
||||
threads have panicked. {:?}",
|
||||
@@ -71,18 +75,19 @@ impl Executor {
|
||||
// This is important as it makes it possible for the fruit_receiver iteration to
|
||||
// terminate.
|
||||
};
|
||||
// This is lame, but safe.
|
||||
let mut results_with_position = Vec::with_capacity(num_fruits);
|
||||
let mut result_placeholders: Vec<Option<R>> =
|
||||
std::iter::repeat_with(|| None).take(num_fruits).collect();
|
||||
for (pos, fruit_res) in fruit_receiver {
|
||||
let fruit = fruit_res?;
|
||||
results_with_position.push((pos, fruit));
|
||||
result_placeholders[pos] = Some(fruit);
|
||||
}
|
||||
results_with_position.sort_by_key(|(pos, _)| *pos);
|
||||
assert_eq!(results_with_position.len(), num_fruits);
|
||||
Ok(results_with_position
|
||||
.into_iter()
|
||||
.map(|(_, fruit)| fruit)
|
||||
.collect::<Vec<_>>())
|
||||
let results: Vec<R> = result_placeholders.into_iter().flatten().collect();
|
||||
if results.len() != num_fruits {
|
||||
return Err(TantivyError::InternalError(
|
||||
"One of the mapped execution failed.".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ fn load_metas(
|
||||
pub struct IndexBuilder {
|
||||
schema: Option<Schema>,
|
||||
index_settings: IndexSettings,
|
||||
tokenizer_manager: TokenizerManager,
|
||||
}
|
||||
impl Default for IndexBuilder {
|
||||
fn default() -> Self {
|
||||
@@ -86,6 +87,7 @@ impl IndexBuilder {
|
||||
Self {
|
||||
schema: None,
|
||||
index_settings: IndexSettings::default(),
|
||||
tokenizer_manager: TokenizerManager::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,6 +105,12 @@ impl IndexBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the tokenizers .
|
||||
pub fn tokenizers(mut self, tokenizers: TokenizerManager) -> Self {
|
||||
self.tokenizer_manager = tokenizers;
|
||||
self
|
||||
}
|
||||
|
||||
/// Creates a new index using the `RAMDirectory`.
|
||||
///
|
||||
/// The index will be allocated in anonymous memory.
|
||||
@@ -154,7 +162,8 @@ impl IndexBuilder {
|
||||
if !Index::exists(&*dir)? {
|
||||
return self.create(dir);
|
||||
}
|
||||
let index = Index::open(dir)?;
|
||||
let mut index = Index::open(dir)?;
|
||||
index.set_tokenizers(self.tokenizer_manager.clone());
|
||||
if index.schema() == self.get_expect_schema()? {
|
||||
Ok(index)
|
||||
} else {
|
||||
@@ -176,7 +185,8 @@ impl IndexBuilder {
|
||||
)?;
|
||||
let mut metas = IndexMeta::with_schema(self.get_expect_schema()?);
|
||||
metas.index_settings = self.index_settings;
|
||||
let index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
|
||||
let mut index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
|
||||
index.set_tokenizers(self.tokenizer_manager);
|
||||
Ok(index)
|
||||
}
|
||||
}
|
||||
@@ -304,6 +314,11 @@ impl Index {
|
||||
}
|
||||
}
|
||||
|
||||
/// Setter for the tokenizer manager.
|
||||
pub fn set_tokenizers(&mut self, tokenizers: TokenizerManager) {
|
||||
self.tokenizers = tokenizers;
|
||||
}
|
||||
|
||||
/// Accessor for the tokenizer manager.
|
||||
pub fn tokenizers(&self) -> &TokenizerManager {
|
||||
&self.tokenizers
|
||||
@@ -314,20 +329,31 @@ impl Index {
|
||||
let field_entry = self.schema.get_field_entry(field);
|
||||
let field_type = field_entry.field_type();
|
||||
let tokenizer_manager: &TokenizerManager = self.tokenizers();
|
||||
let tokenizer_name_opt: Option<TextAnalyzer> = match field_type {
|
||||
FieldType::Str(text_options) => text_options
|
||||
.get_indexing_options()
|
||||
.map(|text_indexing_options| text_indexing_options.tokenizer().to_string())
|
||||
.and_then(|tokenizer_name| tokenizer_manager.get(&tokenizer_name)),
|
||||
_ => None,
|
||||
let indexing_options_opt = match field_type {
|
||||
FieldType::JsonObject(options) => options.get_text_indexing_options(),
|
||||
FieldType::Str(options) => options.get_indexing_options(),
|
||||
_ => {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"{:?} is not a text field.",
|
||||
field_entry.name()
|
||||
)))
|
||||
}
|
||||
};
|
||||
match tokenizer_name_opt {
|
||||
Some(tokenizer) => Ok(tokenizer),
|
||||
None => Err(TantivyError::SchemaError(format!(
|
||||
"{:?} is not a text field.",
|
||||
field_entry.name()
|
||||
))),
|
||||
}
|
||||
let indexing_options = indexing_options_opt.ok_or_else(|| {
|
||||
TantivyError::InvalidArgument(format!(
|
||||
"No indexing options set for field {:?}",
|
||||
field_entry
|
||||
))
|
||||
})?;
|
||||
|
||||
tokenizer_manager
|
||||
.get(indexing_options.tokenizer())
|
||||
.ok_or_else(|| {
|
||||
TantivyError::InvalidArgument(format!(
|
||||
"No Tokenizer found for field {:?}",
|
||||
field_entry
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a default `IndexReader` for the given index.
|
||||
@@ -557,7 +583,8 @@ impl fmt::Debug for Index {
|
||||
mod tests {
|
||||
use crate::directory::{RamDirectory, WatchCallback};
|
||||
use crate::schema::{Field, Schema, INDEXED, TEXT};
|
||||
use crate::{Directory, Index, IndexReader, IndexSettings, ReloadPolicy};
|
||||
use crate::tokenizer::TokenizerManager;
|
||||
use crate::{Directory, Index, IndexBuilder, IndexReader, IndexSettings, ReloadPolicy};
|
||||
|
||||
#[test]
|
||||
fn test_indexer_for_field() {
|
||||
@@ -573,6 +600,21 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_tokenizer_manager() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
schema_builder.add_u64_field("num_likes", INDEXED);
|
||||
schema_builder.add_text_field("body", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = IndexBuilder::new()
|
||||
// set empty tokenizer manager
|
||||
.tokenizers(TokenizerManager::new())
|
||||
.schema(schema)
|
||||
.create_in_ram()
|
||||
.unwrap();
|
||||
assert!(index.tokenizers().get("raw").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_exists() {
|
||||
let directory: Box<dyn Directory> = Box::new(RamDirectory::create());
|
||||
@@ -702,7 +744,7 @@ mod tests {
|
||||
.try_into()?;
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
writer.add_document(doc!(field=>1u64))?;
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let (sender, receiver) = crossbeam_channel::unbounded();
|
||||
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
@@ -737,7 +779,7 @@ mod tests {
|
||||
reader: &IndexReader,
|
||||
) -> crate::Result<()> {
|
||||
let mut reader_index = reader.index();
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let (sender, receiver) = crossbeam_channel::unbounded();
|
||||
let _watch_handle = reader_index
|
||||
.directory_mut()
|
||||
.watch(WatchCallback::new(move || {
|
||||
|
||||
@@ -239,7 +239,7 @@ impl InnerSegmentMeta {
|
||||
///
|
||||
/// Contains settings which are applied on the whole
|
||||
/// index, like presort documents.
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct IndexSettings {
|
||||
/// Sorts the documents by information
|
||||
/// provided in `IndexSortByField`
|
||||
@@ -248,7 +248,26 @@ pub struct IndexSettings {
|
||||
/// The `Compressor` used to compress the doc store.
|
||||
#[serde(default)]
|
||||
pub docstore_compression: Compressor,
|
||||
#[serde(default = "default_docstore_blocksize")]
|
||||
/// The size of each block that will be compressed and written to disk
|
||||
pub docstore_blocksize: usize,
|
||||
}
|
||||
|
||||
/// Must be a function to be compatible with serde defaults
|
||||
fn default_docstore_blocksize() -> usize {
|
||||
16_384
|
||||
}
|
||||
|
||||
impl Default for IndexSettings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sort_by_field: None,
|
||||
docstore_compression: Compressor::default(),
|
||||
docstore_blocksize: default_docstore_blocksize(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Settings to presort the documents in an index
|
||||
///
|
||||
/// Presorting documents can greatly performance
|
||||
@@ -401,7 +420,7 @@ mod tests {
|
||||
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
|
||||
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
|
||||
);
|
||||
|
||||
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
|
||||
|
||||
@@ -110,7 +110,7 @@ mod tests {
|
||||
let tmp_file = tmp_dir.path().join("watched.txt");
|
||||
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let (tx, rx) = crossbeam::channel::unbounded();
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
let timeout = Duration::from_millis(100);
|
||||
|
||||
let watcher = FileWatcher::new(&tmp_file);
|
||||
@@ -153,7 +153,7 @@ mod tests {
|
||||
let tmp_file = tmp_dir.path().join("watched.txt");
|
||||
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let (tx, rx) = crossbeam::channel::unbounded();
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
let timeout = Duration::from_millis(100);
|
||||
|
||||
let watcher = FileWatcher::new(&tmp_file);
|
||||
|
||||
@@ -181,7 +181,7 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
|
||||
|
||||
fn test_watch(directory: &dyn Directory) {
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let (tx, rx) = crossbeam::channel::unbounded();
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
let timeout = Duration::from_millis(500);
|
||||
|
||||
let handle = directory
|
||||
|
||||
@@ -300,7 +300,7 @@ impl IntFastFieldWriter {
|
||||
/// If the document has more than one value for the given field,
|
||||
/// only the first one is taken in account.
|
||||
///
|
||||
/// Values for string fast fields are skipped.
|
||||
/// Values on text fast fields are skipped.
|
||||
pub fn add_document(&mut self, doc: &Document) {
|
||||
match doc.get_first(self.field) {
|
||||
Some(v) => {
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use common::BitSet;
|
||||
use crossbeam::channel;
|
||||
use smallvec::smallvec;
|
||||
|
||||
use super::operation::{AddOperation, UserOperation};
|
||||
@@ -289,7 +288,7 @@ impl IndexWriter {
|
||||
return Err(TantivyError::InvalidArgument(err_msg));
|
||||
}
|
||||
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
|
||||
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||
|
||||
let delete_queue = DeleteQueue::new();
|
||||
|
||||
@@ -326,7 +325,7 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
fn drop_sender(&mut self) {
|
||||
let (sender, _receiver) = channel::bounded(1);
|
||||
let (sender, _receiver) = crossbeam_channel::bounded(1);
|
||||
self.operation_sender = sender;
|
||||
}
|
||||
|
||||
@@ -532,7 +531,7 @@ impl IndexWriter {
|
||||
/// Returns the former segment_ready channel.
|
||||
fn recreate_document_channel(&mut self) {
|
||||
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
|
||||
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||
self.operation_sender = document_sender;
|
||||
self.index_writer_status = IndexWriterStatus::from(document_receiver);
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ impl Drop for IndexWriterBomb {
|
||||
mod tests {
|
||||
use std::mem;
|
||||
|
||||
use crossbeam::channel;
|
||||
use crossbeam_channel as channel;
|
||||
|
||||
use super::IndexWriterStatus;
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use murmurhash32::murmurhash2;
|
||||
use crate::fastfield::FastValue;
|
||||
use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter};
|
||||
use crate::schema::term::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
|
||||
use crate::schema::Type;
|
||||
use crate::schema::{Field, Type};
|
||||
use crate::time::format_description::well_known::Rfc3339;
|
||||
use crate::time::{OffsetDateTime, UtcOffset};
|
||||
use crate::tokenizer::TextAnalyzer;
|
||||
@@ -199,12 +199,77 @@ fn infer_type_from_str(text: &str) -> TextOrDateTime {
|
||||
}
|
||||
}
|
||||
|
||||
// Tries to infer a JSON type from a string
|
||||
pub(crate) fn convert_to_fast_value_and_get_term(
|
||||
json_term_writer: &mut JsonTermWriter,
|
||||
phrase: &str,
|
||||
) -> Option<Term> {
|
||||
if let Ok(dt) = OffsetDateTime::parse(phrase, &Rfc3339) {
|
||||
let dt_utc = dt.to_offset(UtcOffset::UTC);
|
||||
return Some(set_fastvalue_and_get_term(
|
||||
json_term_writer,
|
||||
DateTime::from_utc(dt_utc),
|
||||
));
|
||||
}
|
||||
if let Ok(u64_val) = str::parse::<u64>(phrase) {
|
||||
return Some(set_fastvalue_and_get_term(json_term_writer, u64_val));
|
||||
}
|
||||
if let Ok(i64_val) = str::parse::<i64>(phrase) {
|
||||
return Some(set_fastvalue_and_get_term(json_term_writer, i64_val));
|
||||
}
|
||||
if let Ok(f64_val) = str::parse::<f64>(phrase) {
|
||||
return Some(set_fastvalue_and_get_term(json_term_writer, f64_val));
|
||||
}
|
||||
None
|
||||
}
|
||||
// helper function to generate a Term from a json fastvalue
|
||||
pub(crate) fn set_fastvalue_and_get_term<T: FastValue>(
|
||||
json_term_writer: &mut JsonTermWriter,
|
||||
value: T,
|
||||
) -> Term {
|
||||
json_term_writer.set_fast_value(value);
|
||||
json_term_writer.term().clone()
|
||||
}
|
||||
|
||||
// helper function to generate a list of terms with their positions from a textual json value
|
||||
pub(crate) fn set_string_and_get_terms(
|
||||
json_term_writer: &mut JsonTermWriter,
|
||||
value: &str,
|
||||
text_analyzer: &TextAnalyzer,
|
||||
) -> Vec<(usize, Term)> {
|
||||
let mut positions_and_terms = Vec::<(usize, Term)>::new();
|
||||
json_term_writer.close_path_and_set_type(Type::Str);
|
||||
let term_num_bytes = json_term_writer.term_buffer.as_slice().len();
|
||||
let mut token_stream = text_analyzer.token_stream(value);
|
||||
token_stream.process(&mut |token| {
|
||||
json_term_writer.term_buffer.truncate(term_num_bytes);
|
||||
json_term_writer
|
||||
.term_buffer
|
||||
.append_bytes(token.text.as_bytes());
|
||||
positions_and_terms.push((token.position, json_term_writer.term().clone()));
|
||||
});
|
||||
positions_and_terms
|
||||
}
|
||||
|
||||
pub struct JsonTermWriter<'a> {
|
||||
term_buffer: &'a mut Term,
|
||||
path_stack: Vec<usize>,
|
||||
}
|
||||
|
||||
impl<'a> JsonTermWriter<'a> {
|
||||
pub fn from_field_and_json_path(
|
||||
field: Field,
|
||||
json_path: &str,
|
||||
term_buffer: &'a mut Term,
|
||||
) -> Self {
|
||||
term_buffer.set_field(Type::Json, field);
|
||||
let mut json_term_writer = Self::wrap(term_buffer);
|
||||
for segment in json_path.split('.') {
|
||||
json_term_writer.push_path_segment(segment);
|
||||
}
|
||||
json_term_writer
|
||||
}
|
||||
|
||||
pub fn wrap(term_buffer: &'a mut Term) -> Self {
|
||||
term_buffer.clear_with_type(Type::Json);
|
||||
let mut path_stack = Vec::with_capacity(10);
|
||||
|
||||
@@ -21,11 +21,13 @@ pub mod segment_updater;
|
||||
mod segment_writer;
|
||||
mod stamper;
|
||||
|
||||
use crossbeam::channel;
|
||||
use crossbeam_channel as channel;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
pub use self::index_writer::IndexWriter;
|
||||
pub(crate) use self::json_term_writer::JsonTermWriter;
|
||||
pub(crate) use self::json_term_writer::{
|
||||
convert_to_fast_value_and_get_term, set_string_and_get_terms, JsonTermWriter,
|
||||
};
|
||||
pub use self::log_merge_policy::LogMergePolicy;
|
||||
pub use self::merge_operation::MergeOperation;
|
||||
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||
|
||||
@@ -39,9 +39,10 @@ impl SegmentSerializer {
|
||||
|
||||
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
|
||||
let compressor = segment.index().settings().docstore_compression;
|
||||
let blocksize = segment.index().settings().docstore_blocksize;
|
||||
Ok(SegmentSerializer {
|
||||
segment,
|
||||
store_writer: StoreWriter::new(store_write, compressor),
|
||||
store_writer: StoreWriter::new(store_write, compressor, blocksize),
|
||||
fast_field_serializer,
|
||||
fieldnorms_serializer: Some(fieldnorms_serializer),
|
||||
postings_serializer,
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
@@ -27,7 +26,7 @@ use crate::indexer::{
|
||||
SegmentSerializer,
|
||||
};
|
||||
use crate::schema::Schema;
|
||||
use crate::{FutureResult, Opstamp, TantivyError};
|
||||
use crate::{FutureResult, Opstamp};
|
||||
|
||||
const NUM_MERGE_THREADS: usize = 4;
|
||||
|
||||
@@ -73,10 +72,12 @@ fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()>
|
||||
let mut buffer = serde_json::to_vec_pretty(metas)?;
|
||||
// Just adding a new line at the end of the buffer.
|
||||
writeln!(&mut buffer)?;
|
||||
fail_point!("save_metas", |msg| Err(TantivyError::from(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
msg.unwrap_or_else(|| "Undefined".to_string())
|
||||
))));
|
||||
fail_point!("save_metas", |msg| Err(crate::TantivyError::from(
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
msg.unwrap_or_else(|| "Undefined".to_string())
|
||||
)
|
||||
)));
|
||||
directory.sync_directory()?;
|
||||
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
|
||||
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
|
||||
|
||||
@@ -372,9 +372,10 @@ fn remap_and_write(
|
||||
.segment_mut()
|
||||
.open_write(SegmentComponent::Store)?;
|
||||
let compressor = serializer.segment().index().settings().docstore_compression;
|
||||
let block_size = serializer.segment().index().settings().docstore_blocksize;
|
||||
let old_store_writer = std::mem::replace(
|
||||
&mut serializer.store_writer,
|
||||
StoreWriter::new(store_write, compressor),
|
||||
StoreWriter::new(store_write, compressor, block_size),
|
||||
);
|
||||
old_store_writer.close()?;
|
||||
let store_read = StoreReader::open(
|
||||
|
||||
@@ -184,6 +184,66 @@ fn intersection_with_slop(left: &mut [u32], right: &[u32], slop: u32) -> usize {
|
||||
count
|
||||
}
|
||||
|
||||
fn intersection_count_with_slop(left: &[u32], right: &[u32], slop: u32) -> usize {
|
||||
let mut left_index = 0;
|
||||
let mut right_index = 0;
|
||||
let mut count = 0;
|
||||
let left_len = left.len();
|
||||
let right_len = right.len();
|
||||
while left_index < left_len && right_index < right_len {
|
||||
let left_val = left[left_index];
|
||||
let right_val = right[right_index];
|
||||
let right_slop = if right_val >= slop {
|
||||
right_val - slop
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
if left_val < right_slop {
|
||||
left_index += 1;
|
||||
} else if right_slop <= left_val && left_val <= right_val {
|
||||
while left_index + 1 < left_len {
|
||||
let next_left_val = left[left_index + 1];
|
||||
if next_left_val > right_val {
|
||||
break;
|
||||
}
|
||||
left_index += 1;
|
||||
}
|
||||
count += 1;
|
||||
left_index += 1;
|
||||
right_index += 1;
|
||||
} else if left_val > right_val {
|
||||
right_index += 1;
|
||||
}
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
fn intersection_exists_with_slop(left: &[u32], right: &[u32], slop: u32) -> bool {
|
||||
let mut left_index = 0;
|
||||
let mut right_index = 0;
|
||||
let left_len = left.len();
|
||||
let right_len = right.len();
|
||||
while left_index < left_len && right_index < right_len {
|
||||
let left_val = left[left_index];
|
||||
let right_val = right[right_index];
|
||||
let right_slop = if right_val >= slop {
|
||||
right_val - slop
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
if left_val < right_slop {
|
||||
left_index += 1;
|
||||
} else if right_slop <= left_val && left_val <= right_val {
|
||||
return true;
|
||||
} else if left_val > right_val {
|
||||
right_index += 1;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
impl<TPostings: Postings> PhraseScorer<TPostings> {
|
||||
pub fn new(
|
||||
term_postings: Vec<(usize, TPostings)>,
|
||||
@@ -237,11 +297,25 @@ impl<TPostings: Postings> PhraseScorer<TPostings> {
|
||||
|
||||
fn phrase_exists(&mut self) -> bool {
|
||||
let intersection_len = self.compute_phrase_match();
|
||||
if self.has_slop() {
|
||||
return intersection_exists_with_slop(
|
||||
&self.left[..intersection_len],
|
||||
&self.right[..],
|
||||
self.slop,
|
||||
);
|
||||
}
|
||||
intersection_exists(&self.left[..intersection_len], &self.right[..])
|
||||
}
|
||||
|
||||
fn compute_phrase_count(&mut self) -> u32 {
|
||||
let intersection_len = self.compute_phrase_match();
|
||||
if self.has_slop() {
|
||||
return intersection_count_with_slop(
|
||||
&self.left[..intersection_len],
|
||||
&self.right[..],
|
||||
self.slop,
|
||||
) as u32;
|
||||
}
|
||||
intersection_count(&self.left[..intersection_len], &self.right[..]) as u32
|
||||
}
|
||||
|
||||
@@ -252,12 +326,7 @@ impl<TPostings: Postings> PhraseScorer<TPostings> {
|
||||
.positions(&mut self.left);
|
||||
}
|
||||
let mut intersection_len = self.left.len();
|
||||
let end_term = if self.has_slop() {
|
||||
self.num_terms
|
||||
} else {
|
||||
self.num_terms - 1
|
||||
};
|
||||
for i in 1..end_term {
|
||||
for i in 1..self.num_terms - 1 {
|
||||
{
|
||||
self.intersection_docset
|
||||
.docset_mut_specialized(i)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::collections::HashMap;
|
||||
use std::num::{ParseFloatError, ParseIntError};
|
||||
use std::ops::Bound;
|
||||
use std::str::FromStr;
|
||||
@@ -7,7 +7,9 @@ use tantivy_query_grammar::{UserInputAst, UserInputBound, UserInputLeaf, UserInp
|
||||
|
||||
use super::logical_ast::*;
|
||||
use crate::core::Index;
|
||||
use crate::indexer::JsonTermWriter;
|
||||
use crate::indexer::{
|
||||
convert_to_fast_value_and_get_term, set_string_and_get_terms, JsonTermWriter,
|
||||
};
|
||||
use crate::query::{
|
||||
AllQuery, BooleanQuery, BoostQuery, EmptyQuery, Occur, PhraseQuery, Query, RangeQuery,
|
||||
TermQuery,
|
||||
@@ -16,7 +18,7 @@ use crate::schema::{
|
||||
Facet, FacetParseError, Field, FieldType, IndexRecordOption, Schema, Term, Type,
|
||||
};
|
||||
use crate::time::format_description::well_known::Rfc3339;
|
||||
use crate::time::{OffsetDateTime, UtcOffset};
|
||||
use crate::time::OffsetDateTime;
|
||||
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
|
||||
use crate::{DateTime, Score};
|
||||
|
||||
@@ -30,7 +32,7 @@ pub enum QueryParserError {
|
||||
#[error("Unsupported query: {0}")]
|
||||
UnsupportedQuery(String),
|
||||
/// The query references a field that is not in the schema
|
||||
#[error("Field does not exists: '{0:?}'")]
|
||||
#[error("Field does not exists: '{0}'")]
|
||||
FieldDoesNotExist(String),
|
||||
/// The query contains a term for a `u64` or `i64`-field, but the value
|
||||
/// is neither.
|
||||
@@ -53,11 +55,11 @@ pub enum QueryParserError {
|
||||
NoDefaultFieldDeclared,
|
||||
/// The field searched for is not declared
|
||||
/// as indexed in the schema.
|
||||
#[error("The field '{0:?}' is not declared as indexed")]
|
||||
#[error("The field '{0}' is not declared as indexed")]
|
||||
FieldNotIndexed(String),
|
||||
/// A phrase query was requested for a field that does not
|
||||
/// have any positions indexed.
|
||||
#[error("The field '{0:?}' does not have positions indexed")]
|
||||
#[error("The field '{0}' does not have positions indexed")]
|
||||
FieldDoesNotHavePositionsIndexed(String),
|
||||
/// The tokenizer for the given field is unknown
|
||||
/// The two argument strings are the name of the field, the name of the tokenizer
|
||||
@@ -169,7 +171,7 @@ pub struct QueryParser {
|
||||
conjunction_by_default: bool,
|
||||
tokenizer_manager: TokenizerManager,
|
||||
boost: HashMap<Field, Score>,
|
||||
field_names: BTreeSet<String>,
|
||||
field_names: HashMap<String, Field>,
|
||||
}
|
||||
|
||||
fn all_negative(ast: &LogicalAst) -> bool {
|
||||
@@ -182,6 +184,31 @@ fn all_negative(ast: &LogicalAst) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the position (in byte offsets) of the unescaped '.' in the `field_path`.
|
||||
//
|
||||
// This function operates directly on bytes (as opposed to codepoint), relying
|
||||
// on a encoding property of utf-8 for its correctness.
|
||||
fn locate_splitting_dots(field_path: &str) -> Vec<usize> {
|
||||
let mut splitting_dots_pos = Vec::new();
|
||||
let mut escape_state = false;
|
||||
for (pos, b) in field_path.bytes().enumerate() {
|
||||
if escape_state {
|
||||
escape_state = false;
|
||||
continue;
|
||||
}
|
||||
match b {
|
||||
b'\\' => {
|
||||
escape_state = true;
|
||||
}
|
||||
b'.' => {
|
||||
splitting_dots_pos.push(pos);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
splitting_dots_pos
|
||||
}
|
||||
|
||||
impl QueryParser {
|
||||
/// Creates a `QueryParser`, given
|
||||
/// * schema - index Schema
|
||||
@@ -193,7 +220,7 @@ impl QueryParser {
|
||||
) -> QueryParser {
|
||||
let field_names = schema
|
||||
.fields()
|
||||
.map(|(_, field_entry)| field_entry.name().to_string())
|
||||
.map(|(field, field_entry)| (field_entry.name().to_string(), field))
|
||||
.collect();
|
||||
QueryParser {
|
||||
schema,
|
||||
@@ -207,25 +234,18 @@ impl QueryParser {
|
||||
|
||||
// Splits a full_path as written in a query, into a field name and a
|
||||
// json path.
|
||||
pub(crate) fn split_full_path<'a>(&self, full_path: &'a str) -> (&'a str, &'a str) {
|
||||
if full_path.is_empty() {
|
||||
return ("", "");
|
||||
pub(crate) fn split_full_path<'a>(&self, full_path: &'a str) -> Option<(Field, &'a str)> {
|
||||
if let Some(field) = self.field_names.get(full_path) {
|
||||
return Some((*field, ""));
|
||||
}
|
||||
if self.field_names.contains(full_path) {
|
||||
return (full_path, "");
|
||||
}
|
||||
let mut result = ("", full_path);
|
||||
let mut cursor = 0;
|
||||
while let Some(pos) = full_path[cursor..].find('.') {
|
||||
cursor += pos;
|
||||
let prefix = &full_path[..cursor];
|
||||
let suffix = &full_path[cursor + 1..];
|
||||
if self.field_names.contains(prefix) {
|
||||
result = (prefix, suffix);
|
||||
let mut splitting_period_pos: Vec<usize> = locate_splitting_dots(full_path);
|
||||
while let Some(pos) = splitting_period_pos.pop() {
|
||||
let (prefix, suffix) = full_path.split_at(pos);
|
||||
if let Some(field) = self.field_names.get(prefix) {
|
||||
return Some((*field, &suffix[1..]));
|
||||
}
|
||||
cursor += 1;
|
||||
}
|
||||
result
|
||||
None
|
||||
}
|
||||
|
||||
/// Creates a `QueryParser`, given
|
||||
@@ -278,12 +298,6 @@ impl QueryParser {
|
||||
self.compute_logical_ast(user_input_ast)
|
||||
}
|
||||
|
||||
fn resolve_field_name(&self, field_name: &str) -> Result<Field, QueryParserError> {
|
||||
self.schema
|
||||
.get_field(field_name)
|
||||
.ok_or_else(|| QueryParserError::FieldDoesNotExist(String::from(field_name)))
|
||||
}
|
||||
|
||||
fn compute_logical_ast(
|
||||
&self,
|
||||
user_input_ast: UserInputAst,
|
||||
@@ -390,6 +404,12 @@ impl QueryParser {
|
||||
if !field_type.is_indexed() {
|
||||
return Err(QueryParserError::FieldNotIndexed(field_name.to_string()));
|
||||
}
|
||||
if field_type.value_type() != Type::Json && !json_path.is_empty() {
|
||||
let field_name = self.schema.get_field_name(field);
|
||||
return Err(QueryParserError::FieldDoesNotExist(format!(
|
||||
"{field_name}.{json_path}"
|
||||
)));
|
||||
}
|
||||
match *field_type {
|
||||
FieldType::U64(_) => {
|
||||
let val: u64 = u64::from_str(phrase)?;
|
||||
@@ -531,37 +551,56 @@ impl QueryParser {
|
||||
})
|
||||
}
|
||||
|
||||
fn compute_path_triplet_for_literal<'a>(
|
||||
/// Given a literal, returns the list of terms that should be searched.
|
||||
///
|
||||
/// The terms are identified by a triplet:
|
||||
/// - tantivy field
|
||||
/// - field_path: tantivy has JSON fields. It is possible to target a member of a JSON
|
||||
/// object by naturally extending the json field name with a "." separated field_path
|
||||
/// - field_phrase: the phrase that is being searched.
|
||||
///
|
||||
/// The literal identifies the targetted field by a so-called *full field path*,
|
||||
/// specified before the ":". (e.g. identity.username:fulmicoton).
|
||||
///
|
||||
/// The way we split the full field path into (field_name, field_path) can be ambiguous,
|
||||
/// because field_names can contain "." themselves.
|
||||
// For instance if a field is named `one.two` and another one is named `one`,
|
||||
/// should `one.two:three` target `one.two` with field path `` or or `one` with
|
||||
/// the field path `two`.
|
||||
///
|
||||
/// In this case tantivy, just picks the solution with the longest field name.
|
||||
///
|
||||
/// Quirk: As a hack for quickwit, we do not split over a dot that appear escaped '\.'.
|
||||
fn compute_path_triplets_for_literal<'a>(
|
||||
&self,
|
||||
literal: &'a UserInputLiteral,
|
||||
) -> Result<Vec<(Field, &'a str, &'a str)>, QueryParserError> {
|
||||
match &literal.field_name {
|
||||
Some(ref full_path) => {
|
||||
// We need to add terms associated to json default fields.
|
||||
let (field_name, path) = self.split_full_path(full_path);
|
||||
if let Ok(field) = self.resolve_field_name(field_name) {
|
||||
return Ok(vec![(field, path, literal.phrase.as_str())]);
|
||||
}
|
||||
let triplets: Vec<(Field, &str, &str)> = self
|
||||
.default_indexed_json_fields()
|
||||
.map(|json_field| (json_field, full_path.as_str(), literal.phrase.as_str()))
|
||||
.collect();
|
||||
if triplets.is_empty() {
|
||||
return Err(QueryParserError::FieldDoesNotExist(full_path.to_string()));
|
||||
}
|
||||
Ok(triplets)
|
||||
}
|
||||
None => {
|
||||
if self.default_fields.is_empty() {
|
||||
return Err(QueryParserError::NoDefaultFieldDeclared);
|
||||
}
|
||||
Ok(self
|
||||
.default_fields
|
||||
.iter()
|
||||
.map(|default_field| (*default_field, "", literal.phrase.as_str()))
|
||||
.collect::<Vec<(Field, &str, &str)>>())
|
||||
let full_path = if let Some(full_path) = &literal.field_name {
|
||||
full_path
|
||||
} else {
|
||||
// The user did not specify any path...
|
||||
// We simply target default fields.
|
||||
if self.default_fields.is_empty() {
|
||||
return Err(QueryParserError::NoDefaultFieldDeclared);
|
||||
}
|
||||
return Ok(self
|
||||
.default_fields
|
||||
.iter()
|
||||
.map(|default_field| (*default_field, "", literal.phrase.as_str()))
|
||||
.collect::<Vec<(Field, &str, &str)>>());
|
||||
};
|
||||
if let Some((field, path)) = self.split_full_path(full_path) {
|
||||
return Ok(vec![(field, path, literal.phrase.as_str())]);
|
||||
}
|
||||
// We need to add terms associated to json default fields.
|
||||
let triplets: Vec<(Field, &str, &str)> = self
|
||||
.default_indexed_json_fields()
|
||||
.map(|json_field| (json_field, full_path.as_str(), literal.phrase.as_str()))
|
||||
.collect();
|
||||
if triplets.is_empty() {
|
||||
return Err(QueryParserError::FieldDoesNotExist(full_path.to_string()));
|
||||
}
|
||||
Ok(triplets)
|
||||
}
|
||||
|
||||
fn compute_logical_ast_from_leaf(
|
||||
@@ -571,7 +610,7 @@ impl QueryParser {
|
||||
match leaf {
|
||||
UserInputLeaf::Literal(literal) => {
|
||||
let term_phrases: Vec<(Field, &str, &str)> =
|
||||
self.compute_path_triplet_for_literal(&literal)?;
|
||||
self.compute_path_triplets_for_literal(&literal)?;
|
||||
let mut asts: Vec<LogicalAst> = Vec::new();
|
||||
for (field, json_path, phrase) in term_phrases {
|
||||
for ast in self.compute_logical_ast_for_leaf(field, json_path, phrase)? {
|
||||
@@ -598,8 +637,9 @@ impl QueryParser {
|
||||
"Range query need to target a specific field.".to_string(),
|
||||
)
|
||||
})?;
|
||||
let (field_name, json_path) = self.split_full_path(&full_path);
|
||||
let field = self.resolve_field_name(field_name)?;
|
||||
let (field, json_path) = self
|
||||
.split_full_path(&full_path)
|
||||
.ok_or_else(|| QueryParserError::FieldDoesNotExist(full_path.clone()))?;
|
||||
let field_entry = self.schema.get_field_entry(field);
|
||||
let value_type = field_entry.field_type().value_type();
|
||||
let logical_ast = LogicalAst::Leaf(Box::new(LogicalLiteral::Range {
|
||||
@@ -660,30 +700,6 @@ fn generate_literals_for_str(
|
||||
Ok(Some(LogicalLiteral::Phrase(terms)))
|
||||
}
|
||||
|
||||
enum NumValue {
|
||||
U64(u64),
|
||||
I64(i64),
|
||||
F64(f64),
|
||||
DateTime(OffsetDateTime),
|
||||
}
|
||||
|
||||
fn infer_type_num(phrase: &str) -> Option<NumValue> {
|
||||
if let Ok(dt) = OffsetDateTime::parse(phrase, &Rfc3339) {
|
||||
let dt_utc = dt.to_offset(UtcOffset::UTC);
|
||||
return Some(NumValue::DateTime(dt_utc));
|
||||
}
|
||||
if let Ok(u64_val) = str::parse::<u64>(phrase) {
|
||||
return Some(NumValue::U64(u64_val));
|
||||
}
|
||||
if let Ok(i64_val) = str::parse::<i64>(phrase) {
|
||||
return Some(NumValue::I64(i64_val));
|
||||
}
|
||||
if let Ok(f64_val) = str::parse::<f64>(phrase) {
|
||||
return Some(NumValue::F64(f64_val));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn generate_literals_for_json_object(
|
||||
field_name: &str,
|
||||
field: Field,
|
||||
@@ -694,38 +710,13 @@ fn generate_literals_for_json_object(
|
||||
) -> Result<Vec<LogicalLiteral>, QueryParserError> {
|
||||
let mut logical_literals = Vec::new();
|
||||
let mut term = Term::new();
|
||||
term.set_field(Type::Json, field);
|
||||
let mut json_term_writer = JsonTermWriter::wrap(&mut term);
|
||||
for segment in json_path.split('.') {
|
||||
json_term_writer.push_path_segment(segment);
|
||||
let mut json_term_writer =
|
||||
JsonTermWriter::from_field_and_json_path(field, json_path, &mut term);
|
||||
if let Some(term) = convert_to_fast_value_and_get_term(&mut json_term_writer, phrase) {
|
||||
logical_literals.push(LogicalLiteral::Term(term));
|
||||
}
|
||||
if let Some(num_value) = infer_type_num(phrase) {
|
||||
match num_value {
|
||||
NumValue::U64(u64_val) => {
|
||||
json_term_writer.set_fast_value(u64_val);
|
||||
}
|
||||
NumValue::I64(i64_val) => {
|
||||
json_term_writer.set_fast_value(i64_val);
|
||||
}
|
||||
NumValue::F64(f64_val) => {
|
||||
json_term_writer.set_fast_value(f64_val);
|
||||
}
|
||||
NumValue::DateTime(dt_val) => {
|
||||
json_term_writer.set_fast_value(DateTime::from_utc(dt_val));
|
||||
}
|
||||
}
|
||||
logical_literals.push(LogicalLiteral::Term(json_term_writer.term().clone()));
|
||||
}
|
||||
json_term_writer.close_path_and_set_type(Type::Str);
|
||||
let terms = set_string_and_get_terms(&mut json_term_writer, phrase, text_analyzer);
|
||||
drop(json_term_writer);
|
||||
let term_num_bytes = term.as_slice().len();
|
||||
let mut token_stream = text_analyzer.token_stream(phrase);
|
||||
let mut terms: Vec<(usize, Term)> = Vec::new();
|
||||
token_stream.process(&mut |token| {
|
||||
term.truncate(term_num_bytes);
|
||||
term.append_bytes(token.text.as_bytes());
|
||||
terms.push((token.position, term.clone()));
|
||||
});
|
||||
if terms.len() <= 1 {
|
||||
for (_, term) in terms {
|
||||
logical_literals.push(LogicalLiteral::Term(term));
|
||||
@@ -1399,29 +1390,56 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_escaped_field() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
schema_builder.add_text_field(r#"a\.b"#, STRING);
|
||||
let schema = schema_builder.build();
|
||||
let query_parser = QueryParser::new(schema, Vec::new(), TokenizerManager::default());
|
||||
let query = query_parser.parse_query(r#"a\.b:hello"#).unwrap();
|
||||
assert_eq!(
|
||||
format!("{:?}", query),
|
||||
"TermQuery(Term(type=Str, field=0, \"hello\"))"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_split_full_path() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
schema_builder.add_text_field("second", STRING);
|
||||
schema_builder.add_text_field("first", STRING);
|
||||
schema_builder.add_text_field("first.toto", STRING);
|
||||
schema_builder.add_text_field("first.toto.titi", STRING);
|
||||
schema_builder.add_text_field("third.a.b.c", STRING);
|
||||
let schema = schema_builder.build();
|
||||
let query_parser = QueryParser::new(schema, Vec::new(), TokenizerManager::default());
|
||||
let query_parser =
|
||||
QueryParser::new(schema.clone(), Vec::new(), TokenizerManager::default());
|
||||
assert_eq!(
|
||||
query_parser.split_full_path("first.toto"),
|
||||
("first.toto", "")
|
||||
Some((schema.get_field("first.toto").unwrap(), ""))
|
||||
);
|
||||
assert_eq!(
|
||||
query_parser.split_full_path("first.toto.bubu"),
|
||||
Some((schema.get_field("first.toto").unwrap(), "bubu"))
|
||||
);
|
||||
assert_eq!(
|
||||
query_parser.split_full_path("first.toto.titi"),
|
||||
Some((schema.get_field("first.toto.titi").unwrap(), ""))
|
||||
);
|
||||
assert_eq!(
|
||||
query_parser.split_full_path("first.titi"),
|
||||
("first", "titi")
|
||||
Some((schema.get_field("first").unwrap(), "titi"))
|
||||
);
|
||||
assert_eq!(query_parser.split_full_path("third"), ("", "third"));
|
||||
assert_eq!(
|
||||
query_parser.split_full_path("hello.toto"),
|
||||
("", "hello.toto")
|
||||
);
|
||||
assert_eq!(query_parser.split_full_path(""), ("", ""));
|
||||
assert_eq!(query_parser.split_full_path("firsty"), ("", "firsty"));
|
||||
assert_eq!(query_parser.split_full_path("third"), None);
|
||||
assert_eq!(query_parser.split_full_path("hello.toto"), None);
|
||||
assert_eq!(query_parser.split_full_path(""), None);
|
||||
assert_eq!(query_parser.split_full_path("firsty"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_locate_splitting_dots() {
|
||||
assert_eq!(&super::locate_splitting_dots("a.b.c"), &[1, 3]);
|
||||
assert_eq!(&super::locate_splitting_dots(r#"a\.b.c"#), &[4]);
|
||||
assert_eq!(&super::locate_splitting_dots(r#"a\..b.c"#), &[3, 5]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crossbeam::channel::{unbounded, Receiver, RecvError, Sender};
|
||||
use crossbeam_channel::{unbounded, Receiver, RecvError, Sender};
|
||||
|
||||
pub struct GenerationItem<T> {
|
||||
generation: usize,
|
||||
@@ -197,7 +197,7 @@ mod tests {
|
||||
|
||||
use std::{iter, mem};
|
||||
|
||||
use crossbeam::channel;
|
||||
use crossbeam_channel as channel;
|
||||
|
||||
use super::{Pool, Queue};
|
||||
|
||||
|
||||
@@ -147,7 +147,7 @@ impl WarmingStateInner {
|
||||
/// Every [GC_INTERVAL] attempt to GC, with panics caught and logged using
|
||||
/// [std::panic::catch_unwind].
|
||||
fn gc_loop(inner: Weak<Mutex<WarmingStateInner>>) {
|
||||
for _ in crossbeam::channel::tick(GC_INTERVAL) {
|
||||
for _ in crossbeam_channel::tick(GC_INTERVAL) {
|
||||
if let Some(inner) = inner.upgrade() {
|
||||
// rely on deterministic gc in tests
|
||||
#[cfg(not(test))]
|
||||
|
||||
@@ -213,6 +213,8 @@ impl BinarySerializable for Document {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use common::BinarySerializable;
|
||||
|
||||
use crate::schema::*;
|
||||
|
||||
#[test]
|
||||
@@ -223,4 +225,22 @@ mod tests {
|
||||
doc.add_text(text_field, "My title");
|
||||
assert_eq!(doc.field_values().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_doc_serialization_issue() {
|
||||
let mut doc = Document::default();
|
||||
doc.add_json_object(
|
||||
Field::from_field_id(0),
|
||||
serde_json::json!({"key": 2u64})
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.clone(),
|
||||
);
|
||||
doc.add_text(Field::from_field_id(1), "hello");
|
||||
assert_eq!(doc.field_values().len(), 2);
|
||||
let mut payload: Vec<u8> = Vec::new();
|
||||
doc.serialize(&mut payload).unwrap();
|
||||
assert_eq!(payload.len(), 26);
|
||||
Document::deserialize(&mut &payload[..]).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,6 +42,11 @@ impl TextOptions {
|
||||
/// Text fast fields will have the term ids stored in the fast field.
|
||||
/// The fast field will be a multivalued fast field.
|
||||
///
|
||||
/// The effective cardinality depends on the tokenizer. When creating fast fields on text
|
||||
/// fields it is recommended to use the "raw" tokenizer, since it will store the original text
|
||||
/// unchanged. The "default" tokenizer will store the terms as lower case and this will be
|
||||
/// reflected in the dictionary.
|
||||
///
|
||||
/// The original text can be retrieved via `ord_to_term` from the dictionary.
|
||||
#[must_use]
|
||||
pub fn set_fast(mut self) -> TextOptions {
|
||||
|
||||
@@ -388,8 +388,16 @@ mod binary_serialize {
|
||||
}
|
||||
}
|
||||
JSON_OBJ_CODE => {
|
||||
let map = serde_json::from_reader(reader)?;
|
||||
Ok(Value::JsonObject(map))
|
||||
// As explained in
|
||||
// https://docs.serde.rs/serde_json/fn.from_reader.html
|
||||
//
|
||||
// `T::from_reader(..)` expects EOF after reading the object,
|
||||
// which is not what we want here.
|
||||
//
|
||||
// For this reason we need to create our own `Deserializer`.
|
||||
let mut de = serde_json::Deserializer::from_reader(reader);
|
||||
let json_map = <serde_json::Map::<String, serde_json::Value> as serde::Deserialize>::deserialize(&mut de)?;
|
||||
Ok(Value::JsonObject(json_map))
|
||||
}
|
||||
_ => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
|
||||
50
src/store/compression_zstd_block.rs
Normal file
50
src/store/compression_zstd_block.rs
Normal file
@@ -0,0 +1,50 @@
|
||||
use std::io;
|
||||
|
||||
use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
|
||||
use zstd::DEFAULT_COMPRESSION_LEVEL;
|
||||
|
||||
#[inline]
|
||||
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
|
||||
let count_size = std::mem::size_of::<u32>();
|
||||
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;
|
||||
|
||||
compressed.clear();
|
||||
compressed.resize(max_size, 0);
|
||||
|
||||
let compressed_size = compress_to_buffer(
|
||||
uncompressed,
|
||||
&mut compressed[count_size..],
|
||||
DEFAULT_COMPRESSION_LEVEL,
|
||||
)?;
|
||||
|
||||
compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
|
||||
compressed.resize(compressed_size + count_size, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
|
||||
let count_size = std::mem::size_of::<u32>();
|
||||
let uncompressed_size = u32::from_le_bytes(
|
||||
compressed
|
||||
.get(..count_size)
|
||||
.ok_or(io::ErrorKind::InvalidData)?
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
) as usize;
|
||||
|
||||
decompressed.clear();
|
||||
decompressed.resize(uncompressed_size, 0);
|
||||
|
||||
let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?;
|
||||
|
||||
if decompressed_size != uncompressed_size {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"doc store block not completely decompressed, data corruption".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -26,6 +26,9 @@ pub enum Compressor {
|
||||
#[serde(rename = "snappy")]
|
||||
/// Use the snap compressor
|
||||
Snappy,
|
||||
#[serde(rename = "zstd")]
|
||||
/// Use the zstd compressor
|
||||
Zstd,
|
||||
}
|
||||
|
||||
impl Default for Compressor {
|
||||
@@ -36,6 +39,8 @@ impl Default for Compressor {
|
||||
Compressor::Brotli
|
||||
} else if cfg!(feature = "snappy-compression") {
|
||||
Compressor::Snappy
|
||||
} else if cfg!(feature = "zstd-compression") {
|
||||
Compressor::Zstd
|
||||
} else {
|
||||
Compressor::None
|
||||
}
|
||||
@@ -49,6 +54,7 @@ impl Compressor {
|
||||
1 => Compressor::Lz4,
|
||||
2 => Compressor::Brotli,
|
||||
3 => Compressor::Snappy,
|
||||
4 => Compressor::Zstd,
|
||||
_ => panic!("unknown compressor id {:?}", id),
|
||||
}
|
||||
}
|
||||
@@ -58,6 +64,7 @@ impl Compressor {
|
||||
Self::Lz4 => 1,
|
||||
Self::Brotli => 2,
|
||||
Self::Snappy => 3,
|
||||
Self::Zstd => 4,
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
@@ -98,6 +105,16 @@ impl Compressor {
|
||||
panic!("snappy-compression feature flag not activated");
|
||||
}
|
||||
}
|
||||
Self::Zstd => {
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
{
|
||||
super::compression_zstd_block::compress(uncompressed, compressed)
|
||||
}
|
||||
#[cfg(not(feature = "zstd-compression"))]
|
||||
{
|
||||
panic!("zstd-compression feature flag not activated");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,6 +160,16 @@ impl Compressor {
|
||||
panic!("snappy-compression feature flag not activated");
|
||||
}
|
||||
}
|
||||
Self::Zstd => {
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
{
|
||||
super::compression_zstd_block::decompress(compressed, decompressed)
|
||||
}
|
||||
#[cfg(not(feature = "zstd-compression"))]
|
||||
{
|
||||
panic!("zstd-compression feature flag not activated");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,9 @@ mod compression_brotli;
|
||||
#[cfg(feature = "snappy-compression")]
|
||||
mod compression_snap;
|
||||
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
mod compression_zstd_block;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
|
||||
@@ -69,10 +72,13 @@ pub mod tests {
|
||||
sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \
|
||||
mollit anim id est laborum.";
|
||||
|
||||
const BLOCK_SIZE: usize = 16_384;
|
||||
|
||||
pub fn write_lorem_ipsum_store(
|
||||
writer: WritePtr,
|
||||
num_docs: usize,
|
||||
compressor: Compressor,
|
||||
blocksize: usize,
|
||||
) -> Schema {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
|
||||
@@ -80,7 +86,7 @@ pub mod tests {
|
||||
schema_builder.add_text_field("title", TextOptions::default().set_stored());
|
||||
let schema = schema_builder.build();
|
||||
{
|
||||
let mut store_writer = StoreWriter::new(writer, compressor);
|
||||
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
|
||||
for i in 0..num_docs {
|
||||
let mut doc = Document::default();
|
||||
doc.add_field_value(field_body, LOREM.to_string());
|
||||
@@ -103,7 +109,7 @@ pub mod tests {
|
||||
let path = Path::new("store");
|
||||
let directory = RamDirectory::create();
|
||||
let store_wrt = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4);
|
||||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
|
||||
let field_title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file)?;
|
||||
@@ -139,11 +145,11 @@ pub mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn test_store(compressor: Compressor) -> crate::Result<()> {
|
||||
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
|
||||
let path = Path::new("store");
|
||||
let directory = RamDirectory::create();
|
||||
let store_wrt = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor);
|
||||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
|
||||
let field_title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file)?;
|
||||
@@ -169,22 +175,28 @@ pub mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_store_noop() -> crate::Result<()> {
|
||||
test_store(Compressor::None)
|
||||
test_store(Compressor::None, BLOCK_SIZE)
|
||||
}
|
||||
#[cfg(feature = "lz4-compression")]
|
||||
#[test]
|
||||
fn test_store_lz4_block() -> crate::Result<()> {
|
||||
test_store(Compressor::Lz4)
|
||||
test_store(Compressor::Lz4, BLOCK_SIZE)
|
||||
}
|
||||
#[cfg(feature = "snappy-compression")]
|
||||
#[test]
|
||||
fn test_store_snap() -> crate::Result<()> {
|
||||
test_store(Compressor::Snappy)
|
||||
test_store(Compressor::Snappy, BLOCK_SIZE)
|
||||
}
|
||||
#[cfg(feature = "brotli-compression")]
|
||||
#[test]
|
||||
fn test_store_brotli() -> crate::Result<()> {
|
||||
test_store(Compressor::Brotli)
|
||||
test_store(Compressor::Brotli, BLOCK_SIZE)
|
||||
}
|
||||
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
#[test]
|
||||
fn test_store_zstd() -> crate::Result<()> {
|
||||
test_store(Compressor::Zstd, BLOCK_SIZE)
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -348,6 +360,7 @@ mod bench {
|
||||
directory.open_write(path).unwrap(),
|
||||
1_000,
|
||||
Compressor::default(),
|
||||
16_384,
|
||||
);
|
||||
directory.delete(path).unwrap();
|
||||
});
|
||||
@@ -361,6 +374,7 @@ mod bench {
|
||||
directory.open_write(path).unwrap(),
|
||||
1_000,
|
||||
Compressor::default(),
|
||||
16_384,
|
||||
);
|
||||
let store_file = directory.open_read(path).unwrap();
|
||||
let store = StoreReader::open(store_file).unwrap();
|
||||
|
||||
@@ -304,6 +304,8 @@ mod tests {
|
||||
use crate::store::tests::write_lorem_ipsum_store;
|
||||
use crate::Directory;
|
||||
|
||||
const BLOCK_SIZE: usize = 16_384;
|
||||
|
||||
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
|
||||
doc.get_first(*field).and_then(|f| f.as_text())
|
||||
}
|
||||
@@ -313,7 +315,7 @@ mod tests {
|
||||
let directory = RamDirectory::create();
|
||||
let path = Path::new("store");
|
||||
let writer = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default());
|
||||
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
|
||||
let title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file)?;
|
||||
|
||||
@@ -11,8 +11,6 @@ use crate::schema::Document;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
|
||||
const BLOCK_SIZE: usize = 16_384;
|
||||
|
||||
/// Write tantivy's [`Store`](./index.html)
|
||||
///
|
||||
/// Contrary to the other components of `tantivy`,
|
||||
@@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384;
|
||||
/// The skip list index on the other hand, is built in memory.
|
||||
pub struct StoreWriter {
|
||||
compressor: Compressor,
|
||||
block_size: usize,
|
||||
doc: DocId,
|
||||
first_doc_in_block: DocId,
|
||||
offset_index_writer: SkipIndexBuilder,
|
||||
@@ -35,9 +34,10 @@ impl StoreWriter {
|
||||
///
|
||||
/// The store writer will writes blocks on disc as
|
||||
/// document are added.
|
||||
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter {
|
||||
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
|
||||
StoreWriter {
|
||||
compressor,
|
||||
block_size,
|
||||
doc: 0,
|
||||
first_doc_in_block: 0,
|
||||
offset_index_writer: SkipIndexBuilder::new(),
|
||||
@@ -65,7 +65,7 @@ impl StoreWriter {
|
||||
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
|
||||
self.current_block.write_all(serialized_document)?;
|
||||
self.doc += 1;
|
||||
if self.current_block.len() > BLOCK_SIZE {
|
||||
if self.current_block.len() > self.block_size {
|
||||
self.write_and_compress_block()?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -86,7 +86,7 @@ impl StoreWriter {
|
||||
self.current_block
|
||||
.write_all(&self.intermediary_buffer[..])?;
|
||||
self.doc += 1;
|
||||
if self.current_block.len() > BLOCK_SIZE {
|
||||
if self.current_block.len() > self.block_size {
|
||||
self.write_and_compress_block()?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -28,7 +28,6 @@ use fst_termdict as termdict;
|
||||
mod sstable_termdict;
|
||||
#[cfg(feature = "quickwit")]
|
||||
use sstable_termdict as termdict;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -36,24 +35,4 @@ mod tests;
|
||||
/// Position of the term in the sorted list of terms.
|
||||
pub type TermOrdinal = u64;
|
||||
|
||||
/// The term dictionary contains all of the terms in
|
||||
/// `tantivy index` in a sorted manner.
|
||||
pub type TermDictionary = self::termdict::TermDictionary;
|
||||
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// Inserting must be done in the order of the `keys`.
|
||||
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
|
||||
|
||||
/// Given a list of sorted term streams,
|
||||
/// returns an iterator over sorted unique terms.
|
||||
///
|
||||
/// The item yield is actually a pair with
|
||||
/// - the term
|
||||
/// - a slice with the ordinal of the segments containing
|
||||
/// the terms.
|
||||
pub type TermMerger<'a> = self::termdict::TermMerger<'a>;
|
||||
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermMerger, TermStreamer};
|
||||
|
||||
@@ -145,6 +145,12 @@ where
|
||||
}
|
||||
|
||||
pub fn write_key(&mut self, key: &[u8]) {
|
||||
// If this is the first key in the block, we use it to
|
||||
// shorten the last term in the last block.
|
||||
if self.first_ordinal_of_the_block == self.num_terms {
|
||||
self.index_builder
|
||||
.shorten_last_block_key_given_next_key(key);
|
||||
}
|
||||
let keep_len = common_prefix_len(&self.previous_key, key);
|
||||
let add_len = key.len() - keep_len;
|
||||
let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
|
||||
@@ -273,11 +279,12 @@ mod test {
|
||||
33u8, 18u8, 19u8, // keep 1 push 1 | 20
|
||||
17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
|
||||
// index
|
||||
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 104, 108, 97, 115, 116, 95, 107,
|
||||
101, 121, 130, 17, 20, 106, 98, 108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106,
|
||||
98, 121, 116, 101, 95, 114, 97, 110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0,
|
||||
99, 101, 110, 100, 11, 109, 102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110,
|
||||
97, 108, 0, 15, 0, 0, 0, 0, 0, 0, 0, // offset for the index
|
||||
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 115, 108, 97, 115, 116, 95, 107,
|
||||
101, 121, 95, 111, 114, 95, 103, 114, 101, 97, 116, 101, 114, 130, 17, 20, 106, 98,
|
||||
108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106, 98, 121, 116, 101, 95, 114, 97,
|
||||
110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0, 99, 101, 110, 100, 11, 109,
|
||||
102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0,
|
||||
0, 0, 0, // offset for the index
|
||||
3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
|
||||
]
|
||||
);
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::ops::Range;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::DataCorruption;
|
||||
use crate::termdict::sstable_termdict::sstable::common_prefix_len;
|
||||
|
||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
||||
pub struct SSTableIndex {
|
||||
@@ -19,7 +20,7 @@ impl SSTableIndex {
|
||||
pub fn search(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
self.blocks
|
||||
.iter()
|
||||
.find(|block| &block.last_key[..] >= key)
|
||||
.find(|block| &block.last_key_or_greater[..] >= key)
|
||||
.map(|block| block.block_addr.clone())
|
||||
}
|
||||
}
|
||||
@@ -32,7 +33,10 @@ pub struct BlockAddr {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct BlockMeta {
|
||||
pub last_key: Vec<u8>,
|
||||
/// Any byte string that is lexicographically greater or equal to
|
||||
/// the last key in the block,
|
||||
/// and yet stricly smaller than the first key in the next block.
|
||||
pub last_key_or_greater: Vec<u8>,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
@@ -41,10 +45,39 @@ pub struct SSTableIndexBuilder {
|
||||
index: SSTableIndex,
|
||||
}
|
||||
|
||||
/// Given that left < right,
|
||||
/// mutates `left into a shorter byte string left'` that
|
||||
/// matches `left <= left' < right`.
|
||||
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
|
||||
assert!(&left[..] < right);
|
||||
let common_len = common_prefix_len(&left, right);
|
||||
if left.len() == common_len {
|
||||
return;
|
||||
}
|
||||
// It is possible to do one character shorter in some case,
|
||||
// but it is not worth the extra complexity
|
||||
for pos in (common_len + 1)..left.len() {
|
||||
if left[pos] != u8::MAX {
|
||||
left[pos] += 1;
|
||||
left.truncate(pos + 1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SSTableIndexBuilder {
|
||||
/// In order to make the index as light as possible, we
|
||||
/// try to find a shorter alternative to the last key of the last block
|
||||
/// that is still smaller than the next key.
|
||||
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
|
||||
if let Some(last_block) = self.index.blocks.last_mut() {
|
||||
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
|
||||
self.index.blocks.push(BlockMeta {
|
||||
last_key: last_key.to_vec(),
|
||||
last_key_or_greater: last_key.to_vec(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range,
|
||||
first_ordinal,
|
||||
@@ -97,4 +130,35 @@ mod tests {
|
||||
"Data corruption: SSTable index is corrupted."
|
||||
);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
|
||||
let mut left_buf = left.to_vec();
|
||||
super::find_shorter_str_in_between(&mut left_buf, right);
|
||||
assert!(left_buf.len() <= left.len());
|
||||
assert!(left <= &left_buf);
|
||||
assert!(&left_buf[..] < &right);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_shorter_str_in_between() {
|
||||
test_find_shorter_str_in_between_aux(b"", b"hello");
|
||||
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
|
||||
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
#[test]
|
||||
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
|
||||
if left < right {
|
||||
test_find_shorter_str_in_between_aux(&left, &right);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,13 @@ pub struct TokenizerManager {
|
||||
}
|
||||
|
||||
impl TokenizerManager {
|
||||
/// Creates an empty tokenizer manager.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
tokenizers: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers a new tokenizer associated with a given name.
|
||||
pub fn register<T>(&self, tokenizer_name: &str, tokenizer: T)
|
||||
where TextAnalyzer: From<T> {
|
||||
@@ -52,9 +59,7 @@ impl Default for TokenizerManager {
|
||||
/// - en_stem
|
||||
/// - ja
|
||||
fn default() -> TokenizerManager {
|
||||
let manager = TokenizerManager {
|
||||
tokenizers: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
let manager = TokenizerManager::new();
|
||||
manager.register("raw", RawTokenizer);
|
||||
manager.register(
|
||||
"default",
|
||||
|
||||
Reference in New Issue
Block a user