mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
10 Commits
0.21.1
...
index_writ
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dc90e4f8e9 | ||
|
|
34920d31f5 | ||
|
|
0241a05b90 | ||
|
|
e125f3b041 | ||
|
|
c520ac46fc | ||
|
|
2d7390341c | ||
|
|
03fcdce016 | ||
|
|
e4e416ac42 | ||
|
|
19325132b7 | ||
|
|
389d36f760 |
6
.github/workflows/coverage.yml
vendored
6
.github/workflows/coverage.yml
vendored
@@ -15,13 +15,13 @@ jobs:
|
||||
coverage:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup toolchain install nightly --profile minimal --component llvm-tools-preview
|
||||
run: rustup toolchain install nightly-2023-09-10 --profile minimal --component llvm-tools-preview
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- uses: taiki-e/install-action@cargo-llvm-cov
|
||||
- name: Generate code coverage
|
||||
run: cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
run: cargo +nightly-2023-09-10 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v3
|
||||
continue-on-error: true
|
||||
|
||||
2
.github/workflows/long_running.yml
vendored
2
.github/workflows/long_running.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
|
||||
4
.github/workflows/test.yml
vendored
4
.github/workflows/test.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install nightly
|
||||
uses: actions-rs/toolchain@v1
|
||||
@@ -60,7 +60,7 @@ jobs:
|
||||
name: test-${{ matrix.features.label}}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
|
||||
@@ -128,7 +128,7 @@ members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sst
|
||||
[[test]]
|
||||
name = "failpoints"
|
||||
path = "tests/failpoints/mod.rs"
|
||||
required-features = ["fail/failpoints"]
|
||||
required-features = ["failpoints"]
|
||||
|
||||
[[bench]]
|
||||
name = "analyzer"
|
||||
|
||||
@@ -24,7 +24,7 @@ const SPECIAL_CHARS: &[char] = &[
|
||||
|
||||
/// consume a field name followed by colon. Return the field name with escape sequence
|
||||
/// already interpreted
|
||||
fn field_name(i: &str) -> IResult<&str, String> {
|
||||
fn field_name(inp: &str) -> IResult<&str, String> {
|
||||
let simple_char = none_of(SPECIAL_CHARS);
|
||||
let first_char = verify(none_of(SPECIAL_CHARS), |c| *c != '-');
|
||||
let escape_sequence = || preceded(char('\\'), one_of(SPECIAL_CHARS));
|
||||
@@ -38,12 +38,12 @@ fn field_name(i: &str) -> IResult<&str, String> {
|
||||
char(':'),
|
||||
),
|
||||
|(first_char, next)| once(first_char).chain(next).collect(),
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
/// Consume a word outside of any context.
|
||||
// TODO should support escape sequences
|
||||
fn word(i: &str) -> IResult<&str, &str> {
|
||||
fn word(inp: &str) -> IResult<&str, &str> {
|
||||
map_res(
|
||||
recognize(tuple((
|
||||
satisfy(|c| {
|
||||
@@ -55,14 +55,14 @@ fn word(i: &str) -> IResult<&str, &str> {
|
||||
})),
|
||||
))),
|
||||
|s| match s {
|
||||
"OR" | "AND" | "NOT" | "IN" => Err(Error::new(i, ErrorKind::Tag)),
|
||||
"OR" | "AND" | "NOT" | "IN" => Err(Error::new(inp, ErrorKind::Tag)),
|
||||
_ => Ok(s),
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn word_infallible(delimiter: &str) -> impl Fn(&str) -> JResult<&str, Option<&str>> + '_ {
|
||||
|i| {
|
||||
|inp| {
|
||||
opt_i_err(
|
||||
preceded(
|
||||
space0,
|
||||
@@ -71,29 +71,29 @@ fn word_infallible(delimiter: &str) -> impl Fn(&str) -> JResult<&str, Option<&st
|
||||
}))),
|
||||
),
|
||||
"expected word",
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume a word inside a Range context. More values are allowed as they are
|
||||
/// not ambiguous in this context.
|
||||
fn relaxed_word(i: &str) -> IResult<&str, &str> {
|
||||
fn relaxed_word(inp: &str) -> IResult<&str, &str> {
|
||||
recognize(tuple((
|
||||
satisfy(|c| !c.is_whitespace() && !['`', '{', '}', '"', '[', ']', '(', ')'].contains(&c)),
|
||||
many0(satisfy(|c: char| {
|
||||
!c.is_whitespace() && !['{', '}', '"', '[', ']', '(', ')'].contains(&c)
|
||||
})),
|
||||
)))(i)
|
||||
)))(inp)
|
||||
}
|
||||
|
||||
fn negative_number(i: &str) -> IResult<&str, &str> {
|
||||
fn negative_number(inp: &str) -> IResult<&str, &str> {
|
||||
recognize(preceded(
|
||||
char('-'),
|
||||
tuple((digit1, opt(tuple((char('.'), digit1))))),
|
||||
))(i)
|
||||
))(inp)
|
||||
}
|
||||
|
||||
fn simple_term(i: &str) -> IResult<&str, (Delimiter, String)> {
|
||||
fn simple_term(inp: &str) -> IResult<&str, (Delimiter, String)> {
|
||||
let escaped_string = |delimiter| {
|
||||
// we need this because none_of can't accept an owned array of char.
|
||||
let not_delimiter = verify(anychar, move |parsed| *parsed != delimiter);
|
||||
@@ -123,13 +123,13 @@ fn simple_term(i: &str) -> IResult<&str, (Delimiter, String)> {
|
||||
simple_quotes,
|
||||
double_quotes,
|
||||
text_no_delimiter,
|
||||
))(i)
|
||||
))(inp)
|
||||
}
|
||||
|
||||
fn simple_term_infallible(
|
||||
delimiter: &str,
|
||||
) -> impl Fn(&str) -> JResult<&str, Option<(Delimiter, String)>> + '_ {
|
||||
|i| {
|
||||
|inp| {
|
||||
let escaped_string = |delimiter| {
|
||||
// we need this because none_of can't accept an owned array of char.
|
||||
let not_delimiter = verify(anychar, move |parsed| *parsed != delimiter);
|
||||
@@ -162,11 +162,11 @@ fn simple_term_infallible(
|
||||
map(word_infallible(delimiter), |(text, errors)| {
|
||||
(text.map(|text| (Delimiter::None, text.to_string())), errors)
|
||||
}),
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
}
|
||||
|
||||
fn term_or_phrase(i: &str) -> IResult<&str, UserInputLeaf> {
|
||||
fn term_or_phrase(inp: &str) -> IResult<&str, UserInputLeaf> {
|
||||
map(
|
||||
tuple((simple_term, fallible(slop_or_prefix_val))),
|
||||
|((delimiter, phrase), (slop, prefix))| {
|
||||
@@ -179,10 +179,10 @@ fn term_or_phrase(i: &str) -> IResult<&str, UserInputLeaf> {
|
||||
}
|
||||
.into()
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn term_or_phrase_infallible(i: &str) -> JResult<&str, Option<UserInputLeaf>> {
|
||||
fn term_or_phrase_infallible(inp: &str) -> JResult<&str, Option<UserInputLeaf>> {
|
||||
map(
|
||||
// ~* for slop/prefix, ) inside group or ast tree, ^ if boost
|
||||
tuple_infallible((simple_term_infallible("*)^"), slop_or_prefix_val)),
|
||||
@@ -214,10 +214,10 @@ fn term_or_phrase_infallible(i: &str) -> JResult<&str, Option<UserInputLeaf>> {
|
||||
};
|
||||
(leaf, errors)
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn term_group(i: &str) -> IResult<&str, UserInputAst> {
|
||||
fn term_group(inp: &str) -> IResult<&str, UserInputAst> {
|
||||
let occur_symbol = alt((
|
||||
value(Occur::MustNot, char('-')),
|
||||
value(Occur::Must, char('+')),
|
||||
@@ -240,12 +240,12 @@ fn term_group(i: &str) -> IResult<&str, UserInputAst> {
|
||||
.collect(),
|
||||
)
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
// this is a precondition for term_group_infallible. Without it, term_group_infallible can fail
|
||||
// with a panic. It does not consume its input.
|
||||
fn term_group_precond(i: &str) -> IResult<&str, (), ()> {
|
||||
fn term_group_precond(inp: &str) -> IResult<&str, (), ()> {
|
||||
value(
|
||||
(),
|
||||
peek(tuple((
|
||||
@@ -253,13 +253,13 @@ fn term_group_precond(i: &str) -> IResult<&str, (), ()> {
|
||||
space0,
|
||||
char('('), // when we are here, we know it can't be anything but a term group
|
||||
))),
|
||||
)(i)
|
||||
)(inp)
|
||||
.map_err(|e| e.map(|_| ()))
|
||||
}
|
||||
|
||||
fn term_group_infallible(i: &str) -> JResult<&str, UserInputAst> {
|
||||
let (mut i, (field_name, _, _, _)) =
|
||||
tuple((field_name, space0, char('('), space0))(i).expect("precondition failed");
|
||||
fn term_group_infallible(inp: &str) -> JResult<&str, UserInputAst> {
|
||||
let (mut inp, (field_name, _, _, _)) =
|
||||
tuple((field_name, space0, char('('), space0))(inp).expect("precondition failed");
|
||||
|
||||
let mut terms = Vec::new();
|
||||
let mut errs = Vec::new();
|
||||
@@ -270,19 +270,19 @@ fn term_group_infallible(i: &str) -> JResult<&str, UserInputAst> {
|
||||
first_round = false;
|
||||
Vec::new()
|
||||
} else {
|
||||
let (rest, (_, err)) = space1_infallible(i)?;
|
||||
i = rest;
|
||||
let (rest, (_, err)) = space1_infallible(inp)?;
|
||||
inp = rest;
|
||||
err
|
||||
};
|
||||
if i.is_empty() {
|
||||
if inp.is_empty() {
|
||||
errs.push(LenientErrorInternal {
|
||||
pos: i.len(),
|
||||
pos: inp.len(),
|
||||
message: "missing )".to_string(),
|
||||
});
|
||||
break Ok((i, (UserInputAst::Clause(terms), errs)));
|
||||
break Ok((inp, (UserInputAst::Clause(terms), errs)));
|
||||
}
|
||||
if let Some(i) = i.strip_prefix(')') {
|
||||
break Ok((i, (UserInputAst::Clause(terms), errs)));
|
||||
if let Some(inp) = inp.strip_prefix(')') {
|
||||
break Ok((inp, (UserInputAst::Clause(terms), errs)));
|
||||
}
|
||||
// only append missing space error if we did not reach the end of group
|
||||
errs.append(&mut space_error);
|
||||
@@ -291,26 +291,57 @@ fn term_group_infallible(i: &str) -> JResult<&str, UserInputAst> {
|
||||
// first byte is not `)` or ' '. If it did not, we would end up looping.
|
||||
|
||||
let (rest, ((occur, leaf), mut err)) =
|
||||
tuple_infallible((occur_symbol, term_or_phrase_infallible))(i)?;
|
||||
tuple_infallible((occur_symbol, term_or_phrase_infallible))(inp)?;
|
||||
errs.append(&mut err);
|
||||
if let Some(leaf) = leaf {
|
||||
terms.push((occur, leaf.set_field(Some(field_name.clone())).into()));
|
||||
}
|
||||
i = rest;
|
||||
inp = rest;
|
||||
}
|
||||
}
|
||||
|
||||
fn literal(i: &str) -> IResult<&str, UserInputAst> {
|
||||
fn exists(inp: &str) -> IResult<&str, UserInputLeaf> {
|
||||
value(
|
||||
UserInputLeaf::Exists {
|
||||
field: String::new(),
|
||||
},
|
||||
tuple((space0, char('*'))),
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn exists_precond(inp: &str) -> IResult<&str, (), ()> {
|
||||
value(
|
||||
(),
|
||||
peek(tuple((
|
||||
field_name,
|
||||
space0,
|
||||
char('*'), // when we are here, we know it can't be anything but a exists
|
||||
))),
|
||||
)(inp)
|
||||
.map_err(|e| e.map(|_| ()))
|
||||
}
|
||||
|
||||
fn exists_infallible(inp: &str) -> JResult<&str, UserInputAst> {
|
||||
let (inp, (field_name, _, _)) =
|
||||
tuple((field_name, space0, char('*')))(inp).expect("precondition failed");
|
||||
|
||||
let exists = UserInputLeaf::Exists { field: field_name }.into();
|
||||
Ok((inp, (exists, Vec::new())))
|
||||
}
|
||||
|
||||
fn literal(inp: &str) -> IResult<&str, UserInputAst> {
|
||||
// * alone is already parsed by our caller, so if `exists` succeed, we can be confident
|
||||
// something (a field name) got parsed before
|
||||
alt((
|
||||
map(
|
||||
tuple((opt(field_name), alt((range, set, term_or_phrase)))),
|
||||
tuple((opt(field_name), alt((range, set, exists, term_or_phrase)))),
|
||||
|(field_name, leaf): (Option<String>, UserInputLeaf)| leaf.set_field(field_name).into(),
|
||||
),
|
||||
term_group,
|
||||
))(i)
|
||||
))(inp)
|
||||
}
|
||||
|
||||
fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
fn literal_no_group_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
map(
|
||||
tuple_infallible((
|
||||
opt_i(field_name),
|
||||
@@ -337,7 +368,7 @@ fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
&& field_name.is_none()
|
||||
{
|
||||
errors.push(LenientErrorInternal {
|
||||
pos: i.len(),
|
||||
pos: inp.len(),
|
||||
message: "parsed possible invalid field as term".to_string(),
|
||||
});
|
||||
}
|
||||
@@ -346,7 +377,7 @@ fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
&& field_name.is_none()
|
||||
{
|
||||
errors.push(LenientErrorInternal {
|
||||
pos: i.len(),
|
||||
pos: inp.len(),
|
||||
message: "parsed keyword NOT as term. It should be quoted".to_string(),
|
||||
});
|
||||
}
|
||||
@@ -355,34 +386,40 @@ fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
errors,
|
||||
)
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn literal_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
fn literal_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
alt_infallible(
|
||||
((
|
||||
term_group_precond,
|
||||
map(term_group_infallible, |(group, errs)| (Some(group), errs)),
|
||||
),),
|
||||
(
|
||||
(
|
||||
term_group_precond,
|
||||
map(term_group_infallible, |(group, errs)| (Some(group), errs)),
|
||||
),
|
||||
(
|
||||
exists_precond,
|
||||
map(exists_infallible, |(exists, errs)| (Some(exists), errs)),
|
||||
),
|
||||
),
|
||||
literal_no_group_infallible,
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn slop_or_prefix_val(i: &str) -> JResult<&str, (u32, bool)> {
|
||||
fn slop_or_prefix_val(inp: &str) -> JResult<&str, (u32, bool)> {
|
||||
map(
|
||||
opt_i(alt((
|
||||
value((0, true), char('*')),
|
||||
map(preceded(char('~'), u32), |slop| (slop, false)),
|
||||
))),
|
||||
|(slop_or_prefix_opt, err)| (slop_or_prefix_opt.unwrap_or_default(), err),
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
/// Function that parses a range out of a Stream
|
||||
/// Supports ranges like:
|
||||
/// [5 TO 10], {5 TO 10}, [* TO 10], [10 TO *], {10 TO *], >5, <=10
|
||||
/// [a TO *], [a TO c], [abc TO bcd}
|
||||
fn range(i: &str) -> IResult<&str, UserInputLeaf> {
|
||||
fn range(inp: &str) -> IResult<&str, UserInputLeaf> {
|
||||
let range_term_val = || {
|
||||
map(
|
||||
alt((negative_number, relaxed_word, tag("*"))),
|
||||
@@ -442,10 +479,10 @@ fn range(i: &str) -> IResult<&str, UserInputLeaf> {
|
||||
lower,
|
||||
upper,
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn range_infallible(i: &str) -> JResult<&str, UserInputLeaf> {
|
||||
fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
|
||||
let lower_to_upper = map(
|
||||
tuple_infallible((
|
||||
opt_i(anychar),
|
||||
@@ -553,10 +590,10 @@ fn range_infallible(i: &str) -> JResult<&str, UserInputLeaf> {
|
||||
errors,
|
||||
)
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn set(i: &str) -> IResult<&str, UserInputLeaf> {
|
||||
fn set(inp: &str) -> IResult<&str, UserInputLeaf> {
|
||||
map(
|
||||
preceded(
|
||||
tuple((space0, tag("IN"), space1)),
|
||||
@@ -570,10 +607,10 @@ fn set(i: &str) -> IResult<&str, UserInputLeaf> {
|
||||
field: None,
|
||||
elements,
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn set_infallible(mut i: &str) -> JResult<&str, UserInputLeaf> {
|
||||
fn set_infallible(mut inp: &str) -> JResult<&str, UserInputLeaf> {
|
||||
// `IN [` has already been parsed when we enter, we only need to parse simple terms until we
|
||||
// find a `]`
|
||||
let mut elements = Vec::new();
|
||||
@@ -584,41 +621,41 @@ fn set_infallible(mut i: &str) -> JResult<&str, UserInputLeaf> {
|
||||
first_round = false;
|
||||
Vec::new()
|
||||
} else {
|
||||
let (rest, (_, err)) = space1_infallible(i)?;
|
||||
i = rest;
|
||||
let (rest, (_, err)) = space1_infallible(inp)?;
|
||||
inp = rest;
|
||||
err
|
||||
};
|
||||
if i.is_empty() {
|
||||
if inp.is_empty() {
|
||||
// TODO push error about missing ]
|
||||
//
|
||||
errs.push(LenientErrorInternal {
|
||||
pos: i.len(),
|
||||
pos: inp.len(),
|
||||
message: "missing ]".to_string(),
|
||||
});
|
||||
let res = UserInputLeaf::Set {
|
||||
field: None,
|
||||
elements,
|
||||
};
|
||||
return Ok((i, (res, errs)));
|
||||
return Ok((inp, (res, errs)));
|
||||
}
|
||||
if let Some(i) = i.strip_prefix(']') {
|
||||
if let Some(inp) = inp.strip_prefix(']') {
|
||||
let res = UserInputLeaf::Set {
|
||||
field: None,
|
||||
elements,
|
||||
};
|
||||
return Ok((i, (res, errs)));
|
||||
return Ok((inp, (res, errs)));
|
||||
}
|
||||
errs.append(&mut space_error);
|
||||
// TODO
|
||||
// here we do the assumption term_or_phrase_infallible always consume something if the
|
||||
// first byte is not `)` or ' '. If it did not, we would end up looping.
|
||||
|
||||
let (rest, (delim_term, mut err)) = simple_term_infallible("]")(i)?;
|
||||
let (rest, (delim_term, mut err)) = simple_term_infallible("]")(inp)?;
|
||||
errs.append(&mut err);
|
||||
if let Some((_, term)) = delim_term {
|
||||
elements.push(term);
|
||||
}
|
||||
i = rest;
|
||||
inp = rest;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -626,16 +663,16 @@ fn negate(expr: UserInputAst) -> UserInputAst {
|
||||
expr.unary(Occur::MustNot)
|
||||
}
|
||||
|
||||
fn leaf(i: &str) -> IResult<&str, UserInputAst> {
|
||||
fn leaf(inp: &str) -> IResult<&str, UserInputAst> {
|
||||
alt((
|
||||
delimited(char('('), ast, char(')')),
|
||||
map(char('*'), |_| UserInputAst::from(UserInputLeaf::All)),
|
||||
map(preceded(tuple((tag("NOT"), space1)), leaf), negate),
|
||||
literal,
|
||||
))(i)
|
||||
))(inp)
|
||||
}
|
||||
|
||||
fn leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
fn leaf_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
alt_infallible(
|
||||
(
|
||||
(
|
||||
@@ -665,23 +702,23 @@ fn leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
),
|
||||
),
|
||||
literal_infallible,
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn positive_float_number(i: &str) -> IResult<&str, f64> {
|
||||
fn positive_float_number(inp: &str) -> IResult<&str, f64> {
|
||||
map(
|
||||
recognize(tuple((digit1, opt(tuple((char('.'), digit1)))))),
|
||||
// TODO this is actually dangerous if the number is actually not representable as a f64
|
||||
// (too big for instance)
|
||||
|float_str: &str| float_str.parse::<f64>().unwrap(),
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn boost(i: &str) -> JResult<&str, Option<f64>> {
|
||||
opt_i(preceded(char('^'), positive_float_number))(i)
|
||||
fn boost(inp: &str) -> JResult<&str, Option<f64>> {
|
||||
opt_i(preceded(char('^'), positive_float_number))(inp)
|
||||
}
|
||||
|
||||
fn boosted_leaf(i: &str) -> IResult<&str, UserInputAst> {
|
||||
fn boosted_leaf(inp: &str) -> IResult<&str, UserInputAst> {
|
||||
map(
|
||||
tuple((leaf, fallible(boost))),
|
||||
|(leaf, boost_opt)| match boost_opt {
|
||||
@@ -690,10 +727,10 @@ fn boosted_leaf(i: &str) -> IResult<&str, UserInputAst> {
|
||||
}
|
||||
_ => leaf,
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn boosted_leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
fn boosted_leaf_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
map(
|
||||
tuple_infallible((leaf_infallible, boost)),
|
||||
|((leaf, boost_opt), error)| match boost_opt {
|
||||
@@ -703,30 +740,30 @@ fn boosted_leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
|
||||
),
|
||||
_ => (leaf, error),
|
||||
},
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn occur_symbol(i: &str) -> JResult<&str, Option<Occur>> {
|
||||
fn occur_symbol(inp: &str) -> JResult<&str, Option<Occur>> {
|
||||
opt_i(alt((
|
||||
value(Occur::MustNot, char('-')),
|
||||
value(Occur::Must, char('+')),
|
||||
)))(i)
|
||||
)))(inp)
|
||||
}
|
||||
|
||||
fn occur_leaf(i: &str) -> IResult<&str, (Option<Occur>, UserInputAst)> {
|
||||
tuple((fallible(occur_symbol), boosted_leaf))(i)
|
||||
fn occur_leaf(inp: &str) -> IResult<&str, (Option<Occur>, UserInputAst)> {
|
||||
tuple((fallible(occur_symbol), boosted_leaf))(inp)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn operand_occur_leaf_infallible(
|
||||
i: &str,
|
||||
inp: &str,
|
||||
) -> JResult<&str, (Option<BinaryOperand>, Option<Occur>, Option<UserInputAst>)> {
|
||||
// TODO maybe this should support multiple chained AND/OR, and "fuse" them?
|
||||
tuple_infallible((
|
||||
delimited_infallible(nothing, opt_i(binary_operand), space0_infallible),
|
||||
occur_symbol,
|
||||
boosted_leaf_infallible,
|
||||
))(i)
|
||||
))(inp)
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
@@ -735,11 +772,11 @@ enum BinaryOperand {
|
||||
And,
|
||||
}
|
||||
|
||||
fn binary_operand(i: &str) -> IResult<&str, BinaryOperand> {
|
||||
fn binary_operand(inp: &str) -> IResult<&str, BinaryOperand> {
|
||||
alt((
|
||||
value(BinaryOperand::And, tag("AND ")),
|
||||
value(BinaryOperand::Or, tag("OR ")),
|
||||
))(i)
|
||||
))(inp)
|
||||
}
|
||||
|
||||
fn aggregate_binary_expressions(
|
||||
@@ -880,14 +917,14 @@ fn aggregate_infallible_expressions(
|
||||
}
|
||||
}
|
||||
|
||||
fn operand_leaf(i: &str) -> IResult<&str, (BinaryOperand, UserInputAst)> {
|
||||
fn operand_leaf(inp: &str) -> IResult<&str, (BinaryOperand, UserInputAst)> {
|
||||
tuple((
|
||||
terminated(binary_operand, space0),
|
||||
terminated(boosted_leaf, space0),
|
||||
))(i)
|
||||
))(inp)
|
||||
}
|
||||
|
||||
fn ast(i: &str) -> IResult<&str, UserInputAst> {
|
||||
fn ast(inp: &str) -> IResult<&str, UserInputAst> {
|
||||
let boolean_expr = map(
|
||||
separated_pair(boosted_leaf, space1, many1(operand_leaf)),
|
||||
|(left, right)| aggregate_binary_expressions(left, right),
|
||||
@@ -908,10 +945,10 @@ fn ast(i: &str) -> IResult<&str, UserInputAst> {
|
||||
space0,
|
||||
alt((boolean_expr, whitespace_separated_leaves)),
|
||||
space0,
|
||||
)(i)
|
||||
)(inp)
|
||||
}
|
||||
|
||||
fn ast_infallible(i: &str) -> JResult<&str, UserInputAst> {
|
||||
fn ast_infallible(inp: &str) -> JResult<&str, UserInputAst> {
|
||||
// ast() parse either `term AND term OR term` or `+term term -term`
|
||||
// both are locally ambiguous, and as we allow error, it's hard to permit backtracking.
|
||||
// Instead, we allow a mix of both syntaxes, trying to make sense of what a user meant.
|
||||
@@ -928,13 +965,13 @@ fn ast_infallible(i: &str) -> JResult<&str, UserInputAst> {
|
||||
},
|
||||
);
|
||||
|
||||
delimited_infallible(space0_infallible, expression, space0_infallible)(i)
|
||||
delimited_infallible(space0_infallible, expression, space0_infallible)(inp)
|
||||
}
|
||||
|
||||
pub fn parse_to_ast(i: &str) -> IResult<&str, UserInputAst> {
|
||||
pub fn parse_to_ast(inp: &str) -> IResult<&str, UserInputAst> {
|
||||
map(delimited(space0, opt(ast), eof), |opt_ast| {
|
||||
rewrite_ast(opt_ast.unwrap_or_else(UserInputAst::empty_query))
|
||||
})(i)
|
||||
})(inp)
|
||||
}
|
||||
|
||||
pub fn parse_to_ast_lenient(query_str: &str) -> (UserInputAst, Vec<LenientError>) {
|
||||
@@ -1538,6 +1575,17 @@ mod test {
|
||||
test_parse_query_to_ast_helper("foo:\"\"*", "\"foo\":\"\"*");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exist_query() {
|
||||
test_parse_query_to_ast_helper("a:*", "\"a\":*");
|
||||
test_parse_query_to_ast_helper("a: *", "\"a\":*");
|
||||
// an exist followed by default term being b
|
||||
test_is_parse_err("a:*b", "(*\"a\":* *b)");
|
||||
|
||||
// this is a term query (not a phrase prefix)
|
||||
test_parse_query_to_ast_helper("a:b*", "\"a\":b*");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_not_queries_are_consistent() {
|
||||
test_parse_query_to_ast_helper("tata -toto", "(*tata -toto)");
|
||||
|
||||
@@ -16,6 +16,9 @@ pub enum UserInputLeaf {
|
||||
field: Option<String>,
|
||||
elements: Vec<String>,
|
||||
},
|
||||
Exists {
|
||||
field: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl UserInputLeaf {
|
||||
@@ -36,6 +39,9 @@ impl UserInputLeaf {
|
||||
upper,
|
||||
},
|
||||
UserInputLeaf::Set { field: _, elements } => UserInputLeaf::Set { field, elements },
|
||||
UserInputLeaf::Exists { field: _ } => UserInputLeaf::Exists {
|
||||
field: field.expect("Exist query without a field isn't allowed"),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -74,6 +80,9 @@ impl Debug for UserInputLeaf {
|
||||
write!(formatter, "]")
|
||||
}
|
||||
UserInputLeaf::All => write!(formatter, "*"),
|
||||
UserInputLeaf::Exists { field } => {
|
||||
write!(formatter, "\"{field}\":*")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,3 +134,142 @@ impl Drop for ResourceLimitGuard {
|
||||
.fetch_sub(self.allocated_with_the_guard, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::aggregation::tests::exec_request_with_query;
|
||||
|
||||
// https://github.com/quickwit-oss/quickwit/issues/3837
|
||||
#[test]
|
||||
fn test_agg_limits_with_empty_merge() {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::bucket::tests::get_test_index_from_docs;
|
||||
|
||||
let docs = vec![
|
||||
vec![r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb", "text2": "bbb" }"#],
|
||||
vec![r#"{ "text": "aaa", "text2": "bbb" }"#],
|
||||
];
|
||||
let index = get_test_index_from_docs(false, &docs).unwrap();
|
||||
|
||||
{
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"1": {
|
||||
"terms": {"field": "text2", "min_doc_count": 0},
|
||||
"aggs": {
|
||||
"2":{
|
||||
"date_histogram": {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"extended_bounds": {
|
||||
"min": "2015-01-01T00:00:00Z",
|
||||
"max": "2015-01-10T00:00:00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
|
||||
let expected_res = json!({
|
||||
"1": {
|
||||
"buckets": [
|
||||
{
|
||||
"2": {
|
||||
"buckets": [
|
||||
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
|
||||
{ "doc_count": 1, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
|
||||
]
|
||||
},
|
||||
"doc_count": 1,
|
||||
"key": "bbb"
|
||||
}
|
||||
],
|
||||
"doc_count_error_upper_bound": 0,
|
||||
"sum_other_doc_count": 0
|
||||
}
|
||||
});
|
||||
assert_eq!(res, expected_res);
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/quickwit-oss/quickwit/issues/3837
|
||||
#[test]
|
||||
fn test_agg_limits_with_empty_data() {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::bucket::tests::get_test_index_from_docs;
|
||||
|
||||
let docs = vec![vec![r#"{ "text": "aaa", "text2": "bbb" }"#]];
|
||||
let index = get_test_index_from_docs(false, &docs).unwrap();
|
||||
|
||||
{
|
||||
// Empty result since there is no doc with dates
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"1": {
|
||||
"terms": {"field": "text2", "min_doc_count": 0},
|
||||
"aggs": {
|
||||
"2":{
|
||||
"date_histogram": {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"extended_bounds": {
|
||||
"min": "2015-01-01T00:00:00Z",
|
||||
"max": "2015-01-10T00:00:00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
|
||||
let expected_res = json!({
|
||||
"1": {
|
||||
"buckets": [
|
||||
{
|
||||
"2": {
|
||||
"buckets": [
|
||||
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
|
||||
]
|
||||
},
|
||||
"doc_count": 0,
|
||||
"key": "bbb"
|
||||
}
|
||||
],
|
||||
"doc_count_error_upper_bound": 0,
|
||||
"sum_other_doc_count": 0
|
||||
}
|
||||
});
|
||||
assert_eq!(res, expected_res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +103,8 @@ impl AggregationWithAccessor {
|
||||
field: field_name, ..
|
||||
}) => {
|
||||
let (accessor, column_type) =
|
||||
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
||||
// Only DateTime is supported for DateHistogram
|
||||
get_ff_reader(reader, field_name, Some(&[ColumnType::DateTime]))?;
|
||||
add_agg_with_accessor(accessor, column_type, &mut res)?;
|
||||
}
|
||||
Terms(TermsAggregation {
|
||||
@@ -117,10 +118,10 @@ impl AggregationWithAccessor {
|
||||
ColumnType::U64,
|
||||
ColumnType::F64,
|
||||
ColumnType::Str,
|
||||
ColumnType::DateTime,
|
||||
// ColumnType::Bytes Unsupported
|
||||
// ColumnType::Bool Unsupported
|
||||
// ColumnType::IpAddr Unsupported
|
||||
// ColumnType::DateTime Unsupported
|
||||
];
|
||||
|
||||
// In case the column is empty we want the shim column to match the missing type
|
||||
@@ -145,7 +146,18 @@ impl AggregationWithAccessor {
|
||||
.map(|m| matches!(m, Key::Str(_)))
|
||||
.unwrap_or(false);
|
||||
|
||||
let use_special_missing_agg = missing_and_more_than_one_col || text_on_non_text_col;
|
||||
// Actually we could convert the text to a number and have the fast path, if it is
|
||||
// provided in Rfc3339 format. But this use case is probably common
|
||||
// enough to justify the effort.
|
||||
let text_on_date_col = column_and_types.len() == 1
|
||||
&& column_and_types[0].1 == ColumnType::DateTime
|
||||
&& missing
|
||||
.as_ref()
|
||||
.map(|m| matches!(m, Key::Str(_)))
|
||||
.unwrap_or(false);
|
||||
|
||||
let use_special_missing_agg =
|
||||
missing_and_more_than_one_col || text_on_non_text_col || text_on_date_col;
|
||||
if use_special_missing_agg {
|
||||
let column_and_types =
|
||||
get_all_ff_reader_or_empty(reader, field_name, None, fallback_type)?;
|
||||
|
||||
@@ -132,6 +132,7 @@ impl DateHistogramAggregationReq {
|
||||
hard_bounds: self.hard_bounds,
|
||||
extended_bounds: self.extended_bounds,
|
||||
keyed: self.keyed,
|
||||
is_normalized_to_ns: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -243,14 +244,14 @@ fn parse_into_milliseconds(input: &str) -> Result<i64, AggregationError> {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
pub mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::exec_request;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::schema::{Schema, FAST};
|
||||
use crate::schema::{Schema, FAST, STRING};
|
||||
use crate::Index;
|
||||
|
||||
#[test]
|
||||
@@ -306,7 +307,8 @@ mod tests {
|
||||
) -> crate::Result<Index> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
schema_builder.add_date_field("date", FAST);
|
||||
schema_builder.add_text_field("text", FAST);
|
||||
schema_builder.add_text_field("text", FAST | STRING);
|
||||
schema_builder.add_text_field("text2", FAST | STRING);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
{
|
||||
|
||||
@@ -122,11 +122,14 @@ pub struct HistogramAggregation {
|
||||
/// Whether to return the buckets as a hash map
|
||||
#[serde(default)]
|
||||
pub keyed: bool,
|
||||
/// Whether the values are normalized to ns for date time values. Defaults to false.
|
||||
#[serde(default)]
|
||||
pub is_normalized_to_ns: bool,
|
||||
}
|
||||
|
||||
impl HistogramAggregation {
|
||||
pub(crate) fn normalize(&mut self, column_type: ColumnType) {
|
||||
if column_type.is_date_time() {
|
||||
pub(crate) fn normalize_date_time(&mut self) {
|
||||
if !self.is_normalized_to_ns {
|
||||
// values are provided in ms, but the fastfield is in nano seconds
|
||||
self.interval *= 1_000_000.0;
|
||||
self.offset = self.offset.map(|off| off * 1_000_000.0);
|
||||
@@ -138,6 +141,7 @@ impl HistogramAggregation {
|
||||
min: bounds.min * 1_000_000.0,
|
||||
max: bounds.max * 1_000_000.0,
|
||||
});
|
||||
self.is_normalized_to_ns = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -370,7 +374,7 @@ impl SegmentHistogramCollector {
|
||||
|
||||
Ok(IntermediateBucketResult::Histogram {
|
||||
buckets,
|
||||
column_type: Some(self.column_type),
|
||||
is_date_agg: self.column_type == ColumnType::DateTime,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -381,7 +385,9 @@ impl SegmentHistogramCollector {
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
req.validate()?;
|
||||
req.normalize(field_type);
|
||||
if field_type == ColumnType::DateTime {
|
||||
req.normalize_date_time();
|
||||
}
|
||||
|
||||
let sub_aggregation_blueprint = if sub_aggregation.is_empty() {
|
||||
None
|
||||
@@ -439,6 +445,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
// memory check upfront
|
||||
let (_, first_bucket_num, last_bucket_num) =
|
||||
generate_bucket_pos_with_opt_minmax(histogram_req, min_max);
|
||||
|
||||
// It's based on user input, so we need to account for overflows
|
||||
let added_buckets = ((last_bucket_num.saturating_sub(first_bucket_num)).max(0) as u64)
|
||||
.saturating_sub(buckets.len() as u64);
|
||||
@@ -482,7 +489,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
// Convert to BucketEntry
|
||||
pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
column_type: Option<ColumnType>,
|
||||
is_date_agg: bool,
|
||||
histogram_req: &HistogramAggregation,
|
||||
sub_aggregation: &Aggregations,
|
||||
limits: &AggregationLimits,
|
||||
@@ -491,8 +498,8 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
// The request used in the the call to final is not yet be normalized.
|
||||
// Normalization is changing the precision from milliseconds to nanoseconds.
|
||||
let mut histogram_req = histogram_req.clone();
|
||||
if let Some(column_type) = column_type {
|
||||
histogram_req.normalize(column_type);
|
||||
if is_date_agg {
|
||||
histogram_req.normalize_date_time();
|
||||
}
|
||||
let mut buckets = if histogram_req.min_doc_count() == 0 {
|
||||
// With min_doc_count != 0, we may need to add buckets, so that there are no
|
||||
@@ -516,7 +523,7 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
|
||||
// If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339
|
||||
// and normalize from nanoseconds to milliseconds
|
||||
if column_type == Some(ColumnType::DateTime) {
|
||||
if is_date_agg {
|
||||
for bucket in buckets.iter_mut() {
|
||||
if let crate::aggregation::Key::F64(ref mut val) = bucket.key {
|
||||
let key_as_string = format_date(*val as i64)?;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use columnar::{BytesColumn, ColumnType, StrColumn};
|
||||
use columnar::{BytesColumn, ColumnType, MonotonicallyMappableToU64, StrColumn};
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::aggregation::intermediate_agg_result::{
|
||||
use crate::aggregation::segment_agg_result::{
|
||||
build_segment_agg_collector, SegmentAggregationCollector,
|
||||
};
|
||||
use crate::aggregation::{f64_from_fastfield_u64, Key};
|
||||
use crate::aggregation::{f64_from_fastfield_u64, format_date, Key};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::TantivyError;
|
||||
|
||||
@@ -531,6 +531,13 @@ impl SegmentTermCollector {
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if self.field_type == ColumnType::DateTime {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
|
||||
let val = i64::from_u64(val);
|
||||
let date = format_date(val)?;
|
||||
dict.insert(IntermediateKey::Str(date), intermediate_entry);
|
||||
}
|
||||
} else {
|
||||
for (val, doc_count) in entries {
|
||||
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
|
||||
@@ -583,6 +590,9 @@ pub(crate) fn cut_off_buckets<T: GetDocCount + Debug>(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common::DateTime;
|
||||
use time::{Date, Month};
|
||||
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::{
|
||||
exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit,
|
||||
@@ -1813,4 +1823,75 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn terms_aggregation_date() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let date_field = schema_builder.add_date_field("date_field", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut writer = index.writer_with_num_threads(1, 15_000_000)?;
|
||||
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
|
||||
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
|
||||
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1983, Month::September, 27)?.with_hms(0, 0, 0)?)))?;
|
||||
writer.commit()?;
|
||||
}
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"my_date": {
|
||||
"terms": {
|
||||
"field": "date_field"
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None)?;
|
||||
|
||||
// date_field field
|
||||
assert_eq!(res["my_date"]["buckets"][0]["key"], "1982-09-17T00:00:00Z");
|
||||
assert_eq!(res["my_date"]["buckets"][0]["doc_count"], 2);
|
||||
assert_eq!(res["my_date"]["buckets"][1]["key"], "1983-09-27T00:00:00Z");
|
||||
assert_eq!(res["my_date"]["buckets"][1]["doc_count"], 1);
|
||||
assert_eq!(res["my_date"]["buckets"][2]["key"], serde_json::Value::Null);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[test]
|
||||
fn terms_aggregation_date_missing() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let date_field = schema_builder.add_date_field("date_field", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut writer = index.writer_with_num_threads(1, 15_000_000)?;
|
||||
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
|
||||
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
|
||||
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1983, Month::September, 27)?.with_hms(0, 0, 0)?)))?;
|
||||
writer.add_document(doc!())?;
|
||||
writer.commit()?;
|
||||
}
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"my_date": {
|
||||
"terms": {
|
||||
"field": "date_field",
|
||||
"missing": "1982-09-17T00:00:00Z"
|
||||
},
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None)?;
|
||||
|
||||
// date_field field
|
||||
assert_eq!(res["my_date"]["buckets"][0]["key"], "1982-09-17T00:00:00Z");
|
||||
assert_eq!(res["my_date"]["buckets"][0]["doc_count"], 3);
|
||||
assert_eq!(res["my_date"]["buckets"][1]["key"], "1983-09-27T00:00:00Z");
|
||||
assert_eq!(res["my_date"]["buckets"][1]["doc_count"], 1);
|
||||
assert_eq!(res["my_date"]["buckets"][2]["key"], serde_json::Value::Null);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,10 +172,16 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
|
||||
Range(_) => IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(
|
||||
Default::default(),
|
||||
)),
|
||||
Histogram(_) | DateHistogram(_) => {
|
||||
Histogram(_) => {
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
|
||||
buckets: Vec::new(),
|
||||
column_type: None,
|
||||
is_date_agg: false,
|
||||
})
|
||||
}
|
||||
DateHistogram(_) => {
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
|
||||
buckets: Vec::new(),
|
||||
is_date_agg: true,
|
||||
})
|
||||
}
|
||||
Average(_) => IntermediateAggregationResult::Metric(IntermediateMetricResult::Average(
|
||||
@@ -343,8 +349,8 @@ pub enum IntermediateBucketResult {
|
||||
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
Histogram {
|
||||
/// The column_type of the underlying `Column`
|
||||
column_type: Option<ColumnType>,
|
||||
/// The column_type of the underlying `Column` is DateTime
|
||||
is_date_agg: bool,
|
||||
/// The buckets
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
},
|
||||
@@ -399,7 +405,7 @@ impl IntermediateBucketResult {
|
||||
Ok(BucketResult::Range { buckets })
|
||||
}
|
||||
IntermediateBucketResult::Histogram {
|
||||
column_type,
|
||||
is_date_agg,
|
||||
buckets,
|
||||
} => {
|
||||
let histogram_req = &req
|
||||
@@ -408,7 +414,7 @@ impl IntermediateBucketResult {
|
||||
.expect("unexpected aggregation, expected histogram aggregation");
|
||||
let buckets = intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets,
|
||||
column_type,
|
||||
is_date_agg,
|
||||
histogram_req,
|
||||
req.sub_aggregation(),
|
||||
limits,
|
||||
@@ -457,11 +463,11 @@ impl IntermediateBucketResult {
|
||||
(
|
||||
IntermediateBucketResult::Histogram {
|
||||
buckets: buckets_left,
|
||||
..
|
||||
is_date_agg: _,
|
||||
},
|
||||
IntermediateBucketResult::Histogram {
|
||||
buckets: buckets_right,
|
||||
..
|
||||
is_date_agg: _,
|
||||
},
|
||||
) => {
|
||||
let buckets: Result<Vec<IntermediateHistogramBucketEntry>, TantivyError> =
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
///
|
||||
/// let mut index_writer = index.writer(3_000_000).unwrap();
|
||||
/// let mut index_writer = index.writer(15_000_000).unwrap();
|
||||
/// index_writer.add_document(doc!(title => "The Name of the Wind")).unwrap();
|
||||
/// index_writer.add_document(doc!(title => "The Diary of Muadib")).unwrap();
|
||||
/// index_writer.add_document(doc!(title => "A Dairy Cow")).unwrap();
|
||||
|
||||
@@ -89,7 +89,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
/// {
|
||||
/// let mut index_writer = index.writer(3_000_000)?;
|
||||
/// let mut index_writer = index.writer(15_000_000)?;
|
||||
/// // a document can be associated with any number of facets
|
||||
/// index_writer.add_document(doc!(
|
||||
/// title => "The Name of the Wind",
|
||||
|
||||
@@ -233,7 +233,7 @@ mod tests {
|
||||
let val_field = schema_builder.add_i64_field("val_field", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
|
||||
let mut writer = index.writer_for_tests()?;
|
||||
writer.add_document(doc!(val_field=>12i64))?;
|
||||
writer.add_document(doc!(val_field=>-30i64))?;
|
||||
writer.add_document(doc!(val_field=>-12i64))?;
|
||||
@@ -255,7 +255,7 @@ mod tests {
|
||||
let val_field = schema_builder.add_i64_field("val_field", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
|
||||
let mut writer = index.writer_for_tests()?;
|
||||
writer.add_document(doc!(val_field=>12i64))?;
|
||||
writer.commit()?;
|
||||
writer.add_document(doc!(val_field=>-30i64))?;
|
||||
@@ -280,7 +280,7 @@ mod tests {
|
||||
let date_field = schema_builder.add_date_field("date_field", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
|
||||
let mut writer = index.writer_for_tests()?;
|
||||
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
|
||||
writer.add_document(
|
||||
doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1986, Month::March, 9)?.with_hms(0, 0, 0)?)),
|
||||
|
||||
@@ -44,7 +44,7 @@
|
||||
//! # let title = schema_builder.add_text_field("title", TEXT);
|
||||
//! # let schema = schema_builder.build();
|
||||
//! # let index = Index::create_in_ram(schema);
|
||||
//! # let mut index_writer = index.writer(3_000_000)?;
|
||||
//! # let mut index_writer = index.writer(15_000_000)?;
|
||||
//! # index_writer.add_document(doc!(
|
||||
//! # title => "The Name of the Wind",
|
||||
//! # ))?;
|
||||
|
||||
@@ -120,7 +120,7 @@ impl<TFruit: Fruit> FruitHandle<TFruit> {
|
||||
/// let title = schema_builder.add_text_field("title", TEXT);
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
/// let mut index_writer = index.writer(3_000_000)?;
|
||||
/// let mut index_writer = index.writer(15_000_000)?;
|
||||
/// index_writer.add_document(doc!(title => "The Name of the Wind"))?;
|
||||
/// index_writer.add_document(doc!(title => "The Diary of Muadib"))?;
|
||||
/// index_writer.add_document(doc!(title => "A Dairy Cow"))?;
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::directory::error::OpenReadError;
|
||||
use crate::directory::MmapDirectory;
|
||||
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
|
||||
use crate::error::{DataCorruption, TantivyError};
|
||||
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_ARENA_NUM_BYTES_MIN};
|
||||
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
|
||||
use crate::indexer::segment_updater::save_metas;
|
||||
use crate::reader::{IndexReader, IndexReaderBuilder};
|
||||
use crate::schema::{Field, FieldType, Schema};
|
||||
@@ -523,9 +523,9 @@ impl Index {
|
||||
/// - `num_threads` defines the number of indexing workers that
|
||||
/// should work at the same time.
|
||||
///
|
||||
/// - `overall_memory_arena_in_bytes` sets the amount of memory
|
||||
/// - `overall_memory_budget_in_bytes` sets the amount of memory
|
||||
/// allocated for all indexing thread.
|
||||
/// Each thread will receive a budget of `overall_memory_arena_in_bytes / num_threads`.
|
||||
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
|
||||
///
|
||||
/// # Errors
|
||||
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
|
||||
@@ -534,7 +534,7 @@ impl Index {
|
||||
pub fn writer_with_num_threads(
|
||||
&self,
|
||||
num_threads: usize,
|
||||
overall_memory_arena_in_bytes: usize,
|
||||
overall_memory_budget_in_bytes: usize,
|
||||
) -> crate::Result<IndexWriter> {
|
||||
let directory_lock = self
|
||||
.directory
|
||||
@@ -550,7 +550,7 @@ impl Index {
|
||||
),
|
||||
)
|
||||
})?;
|
||||
let memory_arena_in_bytes_per_thread = overall_memory_arena_in_bytes / num_threads;
|
||||
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
|
||||
IndexWriter::new(
|
||||
self,
|
||||
num_threads,
|
||||
@@ -561,7 +561,7 @@ impl Index {
|
||||
|
||||
/// Helper to create an index writer for tests.
|
||||
///
|
||||
/// That index writer only simply has a single thread and a memory arena of 10 MB.
|
||||
/// That index writer only simply has a single thread and a memory budget of 15 MB.
|
||||
/// Using a single thread gives us a deterministic allocation of DocId.
|
||||
#[cfg(test)]
|
||||
pub fn writer_for_tests(&self) -> crate::Result<IndexWriter> {
|
||||
@@ -579,13 +579,13 @@ impl Index {
|
||||
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
|
||||
/// If the memory arena per thread is too small or too big, returns
|
||||
/// `TantivyError::InvalidArgument`
|
||||
pub fn writer(&self, memory_arena_num_bytes: usize) -> crate::Result<IndexWriter> {
|
||||
pub fn writer(&self, memory_budget_in_bytes: usize) -> crate::Result<IndexWriter> {
|
||||
let mut num_threads = std::cmp::min(num_cpus::get(), MAX_NUM_THREAD);
|
||||
let memory_arena_num_bytes_per_thread = memory_arena_num_bytes / num_threads;
|
||||
if memory_arena_num_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
|
||||
num_threads = (memory_arena_num_bytes / MEMORY_ARENA_NUM_BYTES_MIN).max(1);
|
||||
let memory_budget_num_bytes_per_thread = memory_budget_in_bytes / num_threads;
|
||||
if memory_budget_num_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
|
||||
num_threads = (memory_budget_in_bytes / MEMORY_BUDGET_NUM_BYTES_MIN).max(1);
|
||||
}
|
||||
self.writer_with_num_threads(num_threads, memory_arena_num_bytes)
|
||||
self.writer_with_num_threads(num_threads, memory_budget_in_bytes)
|
||||
}
|
||||
|
||||
/// Accessor to the index settings
|
||||
|
||||
@@ -234,6 +234,22 @@ impl FastFieldReaders {
|
||||
Ok(dynamic_column_handle_opt)
|
||||
}
|
||||
|
||||
/// Returning all `dynamic_column_handle`.
|
||||
pub fn dynamic_column_handles(
|
||||
&self,
|
||||
field_name: &str,
|
||||
) -> crate::Result<Vec<DynamicColumnHandle>> {
|
||||
let Some(resolved_field_name) = self.resolve_field(field_name)? else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
let dynamic_column_handles = self
|
||||
.columnar
|
||||
.read_columns(&resolved_field_name)?
|
||||
.into_iter()
|
||||
.collect();
|
||||
Ok(dynamic_column_handles)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn list_dynamic_column_handles(
|
||||
&self,
|
||||
@@ -338,6 +354,8 @@ impl FastFieldReaders {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use columnar::ColumnType;
|
||||
|
||||
use crate::schema::{JsonObjectOptions, Schema, FAST};
|
||||
use crate::{Document, Index};
|
||||
|
||||
@@ -417,4 +435,45 @@ mod tests {
|
||||
Some("_dyna\u{1}notinschema\u{1}attr\u{1}color".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fast_field_reader_dynamic_column_handles() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let id = schema_builder.add_u64_field("id", FAST);
|
||||
let json = schema_builder.add_json_field("json", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_for_tests().unwrap();
|
||||
index_writer
|
||||
.add_document(doc!(id=> 1u64, json => json!({"foo": 42})))
|
||||
.unwrap();
|
||||
index_writer
|
||||
.add_document(doc!(id=> 2u64, json => json!({"foo": true})))
|
||||
.unwrap();
|
||||
index_writer
|
||||
.add_document(doc!(id=> 3u64, json => json!({"foo": "bar"})))
|
||||
.unwrap();
|
||||
index_writer.commit().unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
let reader = searcher.segment_reader(0u32);
|
||||
let fast_fields = reader.fast_fields();
|
||||
let id_columns = fast_fields.dynamic_column_handles("id").unwrap();
|
||||
assert_eq!(id_columns.len(), 1);
|
||||
assert_eq!(id_columns.first().unwrap().column_type(), ColumnType::U64);
|
||||
|
||||
let foo_columns = fast_fields.dynamic_column_handles("json.foo").unwrap();
|
||||
assert_eq!(foo_columns.len(), 3);
|
||||
assert!(foo_columns
|
||||
.iter()
|
||||
.any(|column| column.column_type() == ColumnType::I64));
|
||||
assert!(foo_columns
|
||||
.iter()
|
||||
.any(|column| column.column_type() == ColumnType::Bool));
|
||||
assert!(foo_columns
|
||||
.iter()
|
||||
.any(|column| column.column_type() == ColumnType::Str));
|
||||
|
||||
println!("*** {:?}", fast_fields.columnar().list_columns());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::collections::HashSet;
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
|
||||
use crate::schema::*;
|
||||
use crate::{doc, schema, Index, IndexSettings, IndexSortByField, Order, Searcher};
|
||||
|
||||
@@ -30,7 +31,7 @@ fn test_functional_store() -> crate::Result<()> {
|
||||
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
|
||||
let mut index_writer = index.writer_with_num_threads(3, MEMORY_BUDGET_NUM_BYTES_MIN)?;
|
||||
|
||||
let mut doc_set: Vec<u64> = Vec::new();
|
||||
|
||||
|
||||
@@ -27,9 +27,9 @@ use crate::{FutureResult, Opstamp};
|
||||
// in the `memory_arena` goes below MARGIN_IN_BYTES.
|
||||
pub const MARGIN_IN_BYTES: usize = 1_000_000;
|
||||
|
||||
// We impose the memory per thread to be at least 3 MB.
|
||||
pub const MEMORY_ARENA_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize;
|
||||
pub const MEMORY_ARENA_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
|
||||
// We impose the memory per thread to be at least 15 MB, as the baseline consumption is 12MB.
|
||||
pub const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize;
|
||||
pub const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
|
||||
|
||||
// We impose the number of index writer threads to be at most this.
|
||||
pub const MAX_NUM_THREAD: usize = 8;
|
||||
@@ -57,7 +57,8 @@ pub struct IndexWriter {
|
||||
|
||||
index: Index,
|
||||
|
||||
memory_arena_in_bytes_per_thread: usize,
|
||||
// The memory budget per thread, after which a commit is triggered.
|
||||
memory_budget_in_bytes_per_thread: usize,
|
||||
|
||||
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
||||
|
||||
@@ -264,19 +265,19 @@ impl IndexWriter {
|
||||
pub(crate) fn new(
|
||||
index: &Index,
|
||||
num_threads: usize,
|
||||
memory_arena_in_bytes_per_thread: usize,
|
||||
memory_budget_in_bytes_per_thread: usize,
|
||||
directory_lock: DirectoryLock,
|
||||
) -> crate::Result<IndexWriter> {
|
||||
if memory_arena_in_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
|
||||
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
|
||||
let err_msg = format!(
|
||||
"The memory arena in bytes per thread needs to be at least \
|
||||
{MEMORY_ARENA_NUM_BYTES_MIN}."
|
||||
{MEMORY_BUDGET_NUM_BYTES_MIN}."
|
||||
);
|
||||
return Err(TantivyError::InvalidArgument(err_msg));
|
||||
}
|
||||
if memory_arena_in_bytes_per_thread >= MEMORY_ARENA_NUM_BYTES_MAX {
|
||||
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
|
||||
let err_msg = format!(
|
||||
"The memory arena in bytes per thread cannot exceed {MEMORY_ARENA_NUM_BYTES_MAX}"
|
||||
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
|
||||
);
|
||||
return Err(TantivyError::InvalidArgument(err_msg));
|
||||
}
|
||||
@@ -295,7 +296,7 @@ impl IndexWriter {
|
||||
let mut index_writer = IndexWriter {
|
||||
_directory_lock: Some(directory_lock),
|
||||
|
||||
memory_arena_in_bytes_per_thread,
|
||||
memory_budget_in_bytes_per_thread,
|
||||
index: index.clone(),
|
||||
index_writer_status: IndexWriterStatus::from(document_receiver),
|
||||
operation_sender: document_sender,
|
||||
@@ -396,7 +397,7 @@ impl IndexWriter {
|
||||
|
||||
let mut delete_cursor = self.delete_queue.cursor();
|
||||
|
||||
let mem_budget = self.memory_arena_in_bytes_per_thread;
|
||||
let mem_budget = self.memory_budget_in_bytes_per_thread;
|
||||
let index = self.index.clone();
|
||||
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
||||
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
||||
@@ -554,7 +555,7 @@ impl IndexWriter {
|
||||
let new_index_writer: IndexWriter = IndexWriter::new(
|
||||
&self.index,
|
||||
self.num_threads,
|
||||
self.memory_arena_in_bytes_per_thread,
|
||||
self.memory_budget_in_bytes_per_thread,
|
||||
directory_lock,
|
||||
)?;
|
||||
|
||||
@@ -619,7 +620,7 @@ impl IndexWriter {
|
||||
for worker_handle in former_workers_join_handle {
|
||||
let indexing_worker_result = worker_handle
|
||||
.join()
|
||||
.map_err(|e| TantivyError::ErrorInThread(format!("{e:?}")))?;
|
||||
.map_err(|e| TantivyError::ErrorInThread(e.to_string()))?;
|
||||
indexing_worker_result?;
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
@@ -810,6 +811,7 @@ mod tests {
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::error::*;
|
||||
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
|
||||
use crate::schema::{
|
||||
@@ -941,7 +943,7 @@ mod tests {
|
||||
fn test_empty_operations_group() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let index_writer = index.writer(3_000_000).unwrap();
|
||||
let index_writer = index.writer_for_tests().unwrap();
|
||||
let operations1 = vec![];
|
||||
let batch_opstamp1 = index_writer.run(operations1).unwrap();
|
||||
assert_eq!(batch_opstamp1, 0u64);
|
||||
@@ -954,8 +956,8 @@ mod tests {
|
||||
fn test_lockfile_stops_duplicates() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let _index_writer = index.writer(3_000_000).unwrap();
|
||||
match index.writer(3_000_000) {
|
||||
let _index_writer = index.writer_for_tests().unwrap();
|
||||
match index.writer_for_tests() {
|
||||
Err(TantivyError::LockFailure(LockError::LockBusy, _)) => {}
|
||||
_ => panic!("Expected a `LockFailure` error"),
|
||||
}
|
||||
@@ -979,7 +981,7 @@ mod tests {
|
||||
fn test_set_merge_policy() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let index_writer = index.writer(3_000_000).unwrap();
|
||||
let index_writer = index.writer_for_tests().unwrap();
|
||||
assert_eq!(
|
||||
format!("{:?}", index_writer.get_merge_policy()),
|
||||
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, \
|
||||
@@ -998,11 +1000,11 @@ mod tests {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let _index_writer = index.writer(3_000_000).unwrap();
|
||||
let _index_writer = index.writer_for_tests().unwrap();
|
||||
// the lock should be released when the
|
||||
// index_writer leaves the scope.
|
||||
}
|
||||
let _index_writer_two = index.writer(3_000_000).unwrap();
|
||||
let _index_writer_two = index.writer_for_tests().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1022,7 +1024,7 @@ mod tests {
|
||||
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer(3_000_000)?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"a"))?;
|
||||
index_writer.rollback()?;
|
||||
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||
@@ -1054,7 +1056,7 @@ mod tests {
|
||||
reader.searcher().doc_freq(&term_a).unwrap()
|
||||
};
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer(12_000_000).unwrap();
|
||||
let mut index_writer = index.writer_for_tests().unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a"))?;
|
||||
index_writer.commit()?;
|
||||
// this should create 1 segment
|
||||
@@ -1094,7 +1096,7 @@ mod tests {
|
||||
reader.searcher().doc_freq(&term_a).unwrap()
|
||||
};
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer(12_000_000).unwrap();
|
||||
let mut index_writer = index.writer_for_tests().unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a"))?;
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text_field=>"a"))?;
|
||||
@@ -1140,7 +1142,7 @@ mod tests {
|
||||
reader.searcher().doc_freq(&term_a).unwrap()
|
||||
};
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer(12_000_000).unwrap();
|
||||
let mut index_writer = index.writer(MEMORY_BUDGET_NUM_BYTES_MIN).unwrap();
|
||||
// create 8 segments with 100 tiny docs
|
||||
for _doc in 0..100 {
|
||||
index_writer.add_document(doc!(text_field=>"a"))?;
|
||||
@@ -1196,7 +1198,8 @@ mod tests {
|
||||
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000)?;
|
||||
let mut index_writer =
|
||||
index.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)?;
|
||||
// create 8 segments with 100 tiny docs
|
||||
for _doc in 0..100 {
|
||||
index_writer.add_document(doc!(text_field => "a"))?;
|
||||
@@ -1245,7 +1248,9 @@ mod tests {
|
||||
let term = Term::from_field_text(text_field, s);
|
||||
searcher.doc_freq(&term).unwrap()
|
||||
};
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||
let mut index_writer = index
|
||||
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
|
||||
.unwrap();
|
||||
|
||||
let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();
|
||||
let commit_tstamp = index_writer.commit().unwrap();
|
||||
@@ -1262,7 +1267,9 @@ mod tests {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||
let mut index_writer = index
|
||||
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
|
||||
.unwrap();
|
||||
|
||||
let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();
|
||||
|
||||
@@ -1311,7 +1318,9 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||
let mut index_writer = index
|
||||
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
|
||||
.unwrap();
|
||||
let res = index_writer.delete_all_documents();
|
||||
assert!(res.is_ok());
|
||||
|
||||
@@ -1338,7 +1347,9 @@ mod tests {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||
let mut index_writer = index
|
||||
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
|
||||
.unwrap();
|
||||
|
||||
// add one simple doc
|
||||
assert!(index_writer.add_document(doc!(text_field => "a")).is_ok());
|
||||
@@ -1371,7 +1382,9 @@ mod tests {
|
||||
fn test_delete_all_documents_empty_index() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||
let mut index_writer = index
|
||||
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
|
||||
.unwrap();
|
||||
let clear = index_writer.delete_all_documents();
|
||||
let commit = index_writer.commit();
|
||||
assert!(clear.is_ok());
|
||||
@@ -1382,7 +1395,9 @@ mod tests {
|
||||
fn test_delete_all_documents_index_twice() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||
let mut index_writer = index
|
||||
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
|
||||
.unwrap();
|
||||
let clear = index_writer.delete_all_documents();
|
||||
let commit = index_writer.commit();
|
||||
assert!(clear.is_ok());
|
||||
|
||||
@@ -596,10 +596,15 @@ impl SegmentUpdater {
|
||||
);
|
||||
{
|
||||
if let Some(after_merge_segment_entry) = after_merge_segment_entry.as_mut() {
|
||||
// Deletes and commits could have happened as we were merging.
|
||||
// We need to make sure we are up to date with deletes before accepting the
|
||||
// segment.
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||
if let Some(delete_operation) = delete_cursor.get() {
|
||||
let committed_opstamp = segment_updater.load_meta().opstamp;
|
||||
if delete_operation.opstamp < committed_opstamp {
|
||||
// We are not up to date! Let's create a new tombstone file for our
|
||||
// freshly create split.
|
||||
let index = &segment_updater.index;
|
||||
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
||||
if let Err(advance_deletes_err) = advance_deletes(
|
||||
|
||||
@@ -26,6 +26,8 @@ use crate::{DocId, Document, Opstamp, SegmentComponent, TantivyError};
|
||||
fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
|
||||
let table_memory_upper_bound = per_thread_memory_budget / 3;
|
||||
(10..20) // We cap it at 2^19 = 512K capacity.
|
||||
// TODO: There are cases where this limit causes a
|
||||
// reallocation in the hashmap. Check if this affects performance.
|
||||
.map(|power| 1 << power)
|
||||
.take_while(|capacity| compute_table_memory_size(*capacity) < table_memory_upper_bound)
|
||||
.last()
|
||||
|
||||
@@ -225,7 +225,7 @@ pub mod tests {
|
||||
|
||||
{
|
||||
let mut segment_writer =
|
||||
SegmentWriter::for_segment(3_000_000, segment.clone()).unwrap();
|
||||
SegmentWriter::for_segment(15_000_000, segment.clone()).unwrap();
|
||||
{
|
||||
// checking that position works if the field has two values
|
||||
let op = AddOperation {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||
use common::BitSet;
|
||||
use tantivy_fst::Automaton;
|
||||
|
||||
use super::phrase_prefix_query::prefix_end;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::query::{BitSetDocSet, ConstScorer, Explanation, Scorer, Weight};
|
||||
use crate::schema::{Field, IndexRecordOption};
|
||||
@@ -14,6 +15,10 @@ use crate::{DocId, Score, TantivyError};
|
||||
pub struct AutomatonWeight<A> {
|
||||
field: Field,
|
||||
automaton: Arc<A>,
|
||||
// For JSON fields, the term dictionary include terms from all paths.
|
||||
// We apply additional filtering based on the given JSON path, when searching within the term
|
||||
// dictionary. This prevents terms from unrelated paths from matching the search criteria.
|
||||
json_path_bytes: Option<Box<[u8]>>,
|
||||
}
|
||||
|
||||
impl<A> AutomatonWeight<A>
|
||||
@@ -26,6 +31,20 @@ where
|
||||
AutomatonWeight {
|
||||
field,
|
||||
automaton: automaton.into(),
|
||||
json_path_bytes: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new AutomationWeight for a json path
|
||||
pub fn new_for_json_path<IntoArcA: Into<Arc<A>>>(
|
||||
field: Field,
|
||||
automaton: IntoArcA,
|
||||
json_path_bytes: &[u8],
|
||||
) -> AutomatonWeight<A> {
|
||||
AutomatonWeight {
|
||||
field,
|
||||
automaton: automaton.into(),
|
||||
json_path_bytes: Some(json_path_bytes.to_vec().into_boxed_slice()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +53,15 @@ where
|
||||
term_dict: &'a TermDictionary,
|
||||
) -> io::Result<TermStreamer<'a, &'a A>> {
|
||||
let automaton: &A = &self.automaton;
|
||||
let term_stream_builder = term_dict.search(automaton);
|
||||
let mut term_stream_builder = term_dict.search(automaton);
|
||||
|
||||
if let Some(json_path_bytes) = &self.json_path_bytes {
|
||||
term_stream_builder = term_stream_builder.ge(json_path_bytes);
|
||||
if let Some(end) = prefix_end(json_path_bytes) {
|
||||
term_stream_builder = term_stream_builder.lt(&end);
|
||||
}
|
||||
}
|
||||
|
||||
term_stream_builder.into_stream()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::schema::{IndexRecordOption, Term};
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
/// {
|
||||
/// let mut index_writer = index.writer(3_000_000)?;
|
||||
/// let mut index_writer = index.writer(15_000_000)?;
|
||||
/// index_writer.add_document(doc!(
|
||||
/// title => "The Name of the Wind",
|
||||
/// ))?;
|
||||
|
||||
@@ -297,7 +297,7 @@ mod tests {
|
||||
let text = schema_builder.add_text_field("text", STRING);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 5_000_000)?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text=>"a"))?;
|
||||
index_writer.add_document(doc!(text=>"b"))?;
|
||||
index_writer.commit()?;
|
||||
|
||||
@@ -23,7 +23,7 @@ use crate::{Score, Term};
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
/// {
|
||||
/// let mut index_writer = index.writer(3_000_000)?;
|
||||
/// let mut index_writer = index.writer(15_000_000)?;
|
||||
/// index_writer.add_document(doc!(
|
||||
/// title => "The Name of Girl",
|
||||
/// ))?;
|
||||
|
||||
345
src/query/exist_query.rs
Normal file
345
src/query/exist_query.rs
Normal file
@@ -0,0 +1,345 @@
|
||||
use core::fmt::Debug;
|
||||
|
||||
use columnar::{ColumnIndex, DynamicColumn};
|
||||
|
||||
use super::{ConstScorer, EmptyScorer};
|
||||
use crate::core::SegmentReader;
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::query::explanation::does_not_match;
|
||||
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
|
||||
use crate::{DocId, Score, TantivyError};
|
||||
|
||||
/// Query that matches all documents with a non-null value in the specified field.
|
||||
///
|
||||
/// All of the matched documents get the score 1.0.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ExistsQuery {
|
||||
field_name: String,
|
||||
}
|
||||
|
||||
impl ExistsQuery {
|
||||
/// Creates a new `ExistQuery` from the given field.
|
||||
///
|
||||
/// This query matches all documents with at least one non-null value in the specified field.
|
||||
/// This constructor never fails, but executing the search with this query will return an
|
||||
/// error if the specified field doesn't exists or is not a fast field.
|
||||
pub fn new_exists_query(field: String) -> ExistsQuery {
|
||||
ExistsQuery { field_name: field }
|
||||
}
|
||||
}
|
||||
|
||||
impl Query for ExistsQuery {
|
||||
fn weight(&self, enable_scoring: EnableScoring) -> crate::Result<Box<dyn Weight>> {
|
||||
let schema = enable_scoring.schema();
|
||||
let Some((field, _path)) = schema.find_field(&self.field_name) else {
|
||||
return Err(TantivyError::FieldNotFound(self.field_name.clone()));
|
||||
};
|
||||
let field_type = schema.get_field_entry(field).field_type();
|
||||
if !field_type.is_fast() {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {} is not a fast field.",
|
||||
self.field_name
|
||||
)));
|
||||
}
|
||||
Ok(Box::new(ExistsWeight {
|
||||
field_name: self.field_name.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Weight associated with the `ExistsQuery` query.
|
||||
pub struct ExistsWeight {
|
||||
field_name: String,
|
||||
}
|
||||
|
||||
impl Weight for ExistsWeight {
|
||||
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
let fast_field_reader = reader.fast_fields();
|
||||
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
|
||||
.dynamic_column_handles(&self.field_name)?
|
||||
.into_iter()
|
||||
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
|
||||
.collect();
|
||||
let mut non_empty_columns = Vec::new();
|
||||
for column in dynamic_columns? {
|
||||
if !matches!(column.column_index(), ColumnIndex::Empty { .. }) {
|
||||
non_empty_columns.push(column)
|
||||
}
|
||||
}
|
||||
// TODO: we can optimizer more here since in most cases we will have only one index
|
||||
if !non_empty_columns.is_empty() {
|
||||
let docset = ExistsDocSet::new(non_empty_columns, reader.max_doc());
|
||||
Ok(Box::new(ConstScorer::new(docset, boost)))
|
||||
} else {
|
||||
Ok(Box::new(EmptyScorer))
|
||||
}
|
||||
}
|
||||
|
||||
fn explain(&self, reader: &SegmentReader, doc: DocId) -> crate::Result<Explanation> {
|
||||
let mut scorer = self.scorer(reader, 1.0)?;
|
||||
if scorer.seek(doc) != doc {
|
||||
return Err(does_not_match(doc));
|
||||
}
|
||||
Ok(Explanation::new("ExistsQuery", 1.0))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ExistsDocSet {
|
||||
columns: Vec<DynamicColumn>,
|
||||
doc: DocId,
|
||||
max_doc: DocId,
|
||||
}
|
||||
|
||||
impl ExistsDocSet {
|
||||
pub(crate) fn new(columns: Vec<DynamicColumn>, max_doc: DocId) -> Self {
|
||||
let mut set = Self {
|
||||
columns,
|
||||
doc: 0u32,
|
||||
max_doc,
|
||||
};
|
||||
set.find_next();
|
||||
set
|
||||
}
|
||||
|
||||
fn find_next(&mut self) -> DocId {
|
||||
while self.doc < self.max_doc {
|
||||
if self
|
||||
.columns
|
||||
.iter()
|
||||
.any(|col| col.column_index().has_value(self.doc))
|
||||
{
|
||||
return self.doc;
|
||||
}
|
||||
self.doc += 1;
|
||||
}
|
||||
self.doc = TERMINATED;
|
||||
TERMINATED
|
||||
}
|
||||
}
|
||||
|
||||
impl DocSet for ExistsDocSet {
|
||||
fn advance(&mut self) -> DocId {
|
||||
self.seek(self.doc + 1)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> u32 {
|
||||
0
|
||||
}
|
||||
|
||||
fn doc(&self) -> DocId {
|
||||
self.doc
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn seek(&mut self, target: DocId) -> DocId {
|
||||
self.doc = target;
|
||||
self.find_next()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::net::Ipv6Addr;
|
||||
use std::ops::Bound;
|
||||
|
||||
use common::DateTime;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::collector::Count;
|
||||
use crate::query::exist_query::ExistsQuery;
|
||||
use crate::query::{BooleanQuery, RangeQuery};
|
||||
use crate::schema::{Facet, FacetOptions, Schema, FAST, INDEXED, STRING, TEXT};
|
||||
use crate::{doc, Index, Searcher};
|
||||
|
||||
#[test]
|
||||
fn test_exists_query_simple() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let all_field = schema_builder.add_u64_field("all", INDEXED | FAST);
|
||||
let even_field = schema_builder.add_u64_field("even", INDEXED | FAST);
|
||||
let odd_field = schema_builder.add_text_field("odd", STRING | FAST);
|
||||
let multi_field = schema_builder.add_text_field("multi", FAST);
|
||||
let _never_field = schema_builder.add_u64_field("never", INDEXED | FAST);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
for i in 0u64..100u64 {
|
||||
if i % 2 == 0 {
|
||||
if i % 10 == 0 {
|
||||
index_writer.add_document(doc!(all_field => i, even_field => i, multi_field => i.to_string(), multi_field => (i + 1).to_string()))?;
|
||||
} else {
|
||||
index_writer.add_document(doc!(all_field => i, even_field => i))?;
|
||||
}
|
||||
} else {
|
||||
index_writer.add_document(doc!(all_field => i, odd_field => i.to_string()))?;
|
||||
}
|
||||
}
|
||||
index_writer.commit()?;
|
||||
}
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
|
||||
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
|
||||
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
|
||||
|
||||
// exercise seek
|
||||
let query = BooleanQuery::intersection(vec![
|
||||
Box::new(RangeQuery::new_u64_bounds(
|
||||
"all".to_string(),
|
||||
Bound::Included(50),
|
||||
Bound::Unbounded,
|
||||
)),
|
||||
Box::new(ExistsQuery::new_exists_query("even".to_string())),
|
||||
]);
|
||||
assert_eq!(searcher.search(&query, &Count)?, 25);
|
||||
|
||||
let query = BooleanQuery::intersection(vec![
|
||||
Box::new(RangeQuery::new_u64_bounds(
|
||||
"all".to_string(),
|
||||
Bound::Included(0),
|
||||
Bound::Excluded(50),
|
||||
)),
|
||||
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
|
||||
]);
|
||||
assert_eq!(searcher.search(&query, &Count)?, 25);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exists_query_json() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let json = schema_builder.add_json_field("json", TEXT | FAST);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
for i in 0u64..100u64 {
|
||||
if i % 2 == 0 {
|
||||
index_writer.add_document(doc!(json => json!({"all": i, "even": true})))?;
|
||||
} else {
|
||||
index_writer
|
||||
.add_document(doc!(json => json!({"all": i.to_string(), "odd": true})))?;
|
||||
}
|
||||
}
|
||||
index_writer.commit()?;
|
||||
}
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
|
||||
|
||||
// Handling of non-existing fields:
|
||||
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
|
||||
assert_eq!(
|
||||
searcher
|
||||
.search(
|
||||
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
|
||||
&Count
|
||||
)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"The field does not exist: 'does_not_exists.absent'"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exists_query_misc_supported_types() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let bool = schema_builder.add_bool_field("bool", FAST);
|
||||
let bytes = schema_builder.add_bytes_field("bytes", FAST);
|
||||
let date = schema_builder.add_date_field("date", FAST);
|
||||
let f64 = schema_builder.add_f64_field("f64", FAST);
|
||||
let ip_addr = schema_builder.add_ip_addr_field("ip_addr", FAST);
|
||||
let facet = schema_builder.add_facet_field("facet", FacetOptions::default());
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
let now = OffsetDateTime::now_utc().unix_timestamp();
|
||||
for i in 0u8..100u8 {
|
||||
if i % 2 == 0 {
|
||||
let date_val = DateTime::from_utc(OffsetDateTime::from_unix_timestamp(
|
||||
now + i as i64 * 100,
|
||||
)?);
|
||||
index_writer.add_document(
|
||||
doc!(bool => i % 3 == 0, bytes => vec![i, i + 1, i + 2], date => date_val),
|
||||
)?;
|
||||
} else {
|
||||
let ip_addr_v6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc00a, i.into());
|
||||
index_writer
|
||||
.add_document(doc!(f64 => i as f64 * 0.5, ip_addr => ip_addr_v6, facet => Facet::from("/facet/foo"), facet => Facet::from("/facet/bar")))?;
|
||||
}
|
||||
}
|
||||
index_writer.commit()?;
|
||||
}
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exists_query_unsupported_types() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let not_fast = schema_builder.add_text_field("not_fast", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(
|
||||
not_fast => "slow",
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
}
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
assert_eq!(
|
||||
searcher
|
||||
.search(
|
||||
&ExistsQuery::new_exists_query("not_fast".to_string()),
|
||||
&Count
|
||||
)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Schema error: 'Field not_fast is not a fast field.'"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
searcher
|
||||
.search(
|
||||
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
|
||||
&Count
|
||||
)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"The field does not exist: 'does_not_exists'"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
|
||||
let query = ExistsQuery::new_exists_query(field.to_string());
|
||||
searcher.search(&query, &Count)
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@ use once_cell::sync::OnceCell;
|
||||
use tantivy_fst::Automaton;
|
||||
|
||||
use crate::query::{AutomatonWeight, EnableScoring, Query, Weight};
|
||||
use crate::schema::Term;
|
||||
use crate::schema::{Term, Type};
|
||||
use crate::TantivyError::InvalidArgument;
|
||||
|
||||
pub(crate) struct DfaWrapper(pub DFA);
|
||||
@@ -46,7 +46,7 @@ impl Automaton for DfaWrapper {
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
/// {
|
||||
/// let mut index_writer = index.writer(3_000_000)?;
|
||||
/// let mut index_writer = index.writer(15_000_000)?;
|
||||
/// index_writer.add_document(doc!(
|
||||
/// title => "The Name of the Wind",
|
||||
/// ))?;
|
||||
@@ -132,18 +132,46 @@ impl FuzzyTermQuery {
|
||||
});
|
||||
|
||||
let term_value = self.term.value();
|
||||
let term_text = term_value.as_str().ok_or_else(|| {
|
||||
InvalidArgument("The fuzzy term query requires a string term.".to_string())
|
||||
})?;
|
||||
|
||||
let term_text = if term_value.typ() == Type::Json {
|
||||
if let Some(json_path_type) = term_value.json_path_type() {
|
||||
if json_path_type != Type::Str {
|
||||
return Err(InvalidArgument(format!(
|
||||
"The fuzzy term query requires a string path type for a json term. Found \
|
||||
{:?}",
|
||||
json_path_type
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
std::str::from_utf8(self.term.serialized_value_bytes()).map_err(|_| {
|
||||
InvalidArgument(
|
||||
"Failed to convert json term value bytes to utf8 string.".to_string(),
|
||||
)
|
||||
})?
|
||||
} else {
|
||||
term_value.as_str().ok_or_else(|| {
|
||||
InvalidArgument("The fuzzy term query requires a string term.".to_string())
|
||||
})?
|
||||
};
|
||||
let automaton = if self.prefix {
|
||||
automaton_builder.build_prefix_dfa(term_text)
|
||||
} else {
|
||||
automaton_builder.build_dfa(term_text)
|
||||
};
|
||||
Ok(AutomatonWeight::new(
|
||||
self.term.field(),
|
||||
DfaWrapper(automaton),
|
||||
))
|
||||
|
||||
if let Some((json_path_bytes, _)) = term_value.as_json() {
|
||||
Ok(AutomatonWeight::new_for_json_path(
|
||||
self.term.field(),
|
||||
DfaWrapper(automaton),
|
||||
json_path_bytes,
|
||||
))
|
||||
} else {
|
||||
Ok(AutomatonWeight::new(
|
||||
self.term.field(),
|
||||
DfaWrapper(automaton),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,9 +185,89 @@ impl Query for FuzzyTermQuery {
|
||||
mod test {
|
||||
use super::FuzzyTermQuery;
|
||||
use crate::collector::{Count, TopDocs};
|
||||
use crate::schema::{Schema, TEXT};
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::QueryParser;
|
||||
use crate::schema::{Schema, STORED, TEXT};
|
||||
use crate::{assert_nearly_equals, Index, Term};
|
||||
|
||||
#[test]
|
||||
pub fn test_fuzzy_json_path() -> crate::Result<()> {
|
||||
// # Defining the schema
|
||||
let mut schema_builder = Schema::builder();
|
||||
let attributes = schema_builder.add_json_field("attributes", TEXT | STORED);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
// # Indexing documents
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
let doc = schema.parse_document(
|
||||
r#"{
|
||||
"attributes": {
|
||||
"a": "japan"
|
||||
}
|
||||
}"#,
|
||||
)?;
|
||||
index_writer.add_document(doc)?;
|
||||
let doc = schema.parse_document(
|
||||
r#"{
|
||||
"attributes": {
|
||||
"aa": "japan"
|
||||
}
|
||||
}"#,
|
||||
)?;
|
||||
index_writer.add_document(doc)?;
|
||||
index_writer.commit()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
// # Fuzzy search
|
||||
let query_parser = QueryParser::for_index(&index, vec![attributes]);
|
||||
|
||||
let get_json_path_term = |query: &str| -> crate::Result<Term> {
|
||||
let query = query_parser.parse_query(query)?;
|
||||
let mut terms = Vec::new();
|
||||
query.query_terms(&mut |term, _| {
|
||||
terms.push(term.clone());
|
||||
});
|
||||
|
||||
Ok(terms[0].clone())
|
||||
};
|
||||
|
||||
// shall not match the first document due to json path mismatch
|
||||
{
|
||||
let term = get_json_path_term("attributes.aa:japan")?;
|
||||
let fuzzy_query = FuzzyTermQuery::new(term, 2, true);
|
||||
let top_docs = searcher.search(&fuzzy_query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(top_docs.len(), 1, "Expected only 1 document");
|
||||
assert_eq!(top_docs[0].1.doc_id, 1, "Expected the second document");
|
||||
}
|
||||
|
||||
// shall match the first document because Levenshtein distance is 1 (substitute 'o' with
|
||||
// 'a')
|
||||
{
|
||||
let term = get_json_path_term("attributes.a:japon")?;
|
||||
|
||||
let fuzzy_query = FuzzyTermQuery::new(term, 1, true);
|
||||
let top_docs = searcher.search(&fuzzy_query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(top_docs.len(), 1, "Expected only 1 document");
|
||||
assert_eq!(top_docs[0].1.doc_id, 0, "Expected the first document");
|
||||
}
|
||||
|
||||
// shall not match because non-prefix Levenshtein distance is more than 1 (add 'a' and 'n')
|
||||
{
|
||||
let term = get_json_path_term("attributes.a:jap")?;
|
||||
|
||||
let fuzzy_query = FuzzyTermQuery::new(term, 1, true);
|
||||
let top_docs = searcher.search(&fuzzy_query, &TopDocs::with_limit(2))?;
|
||||
assert_eq!(top_docs.len(), 0, "Expected no document");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_fuzzy_term() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -8,6 +8,7 @@ mod const_score_query;
|
||||
mod disjunction_max_query;
|
||||
mod empty_query;
|
||||
mod exclude;
|
||||
mod exist_query;
|
||||
mod explanation;
|
||||
mod fuzzy_query;
|
||||
mod intersection;
|
||||
@@ -41,6 +42,7 @@ pub use self::const_score_query::{ConstScoreQuery, ConstScorer};
|
||||
pub use self::disjunction_max_query::DisjunctionMaxQuery;
|
||||
pub use self::empty_query::{EmptyQuery, EmptyScorer, EmptyWeight};
|
||||
pub use self::exclude::Exclude;
|
||||
pub use self::exist_query::ExistsQuery;
|
||||
pub use self::explanation::Explanation;
|
||||
#[cfg(test)]
|
||||
pub(crate) use self::fuzzy_query::DfaWrapper;
|
||||
|
||||
@@ -6,7 +6,7 @@ pub use phrase_prefix_query::PhrasePrefixQuery;
|
||||
pub use phrase_prefix_scorer::PhrasePrefixScorer;
|
||||
pub use phrase_prefix_weight::PhrasePrefixWeight;
|
||||
|
||||
fn prefix_end(prefix_start: &[u8]) -> Option<Vec<u8>> {
|
||||
pub(crate) fn prefix_end(prefix_start: &[u8]) -> Option<Vec<u8>> {
|
||||
let mut res = prefix_start.to_owned();
|
||||
while !res.is_empty() {
|
||||
let end = res.len() - 1;
|
||||
|
||||
@@ -847,6 +847,12 @@ impl QueryParser {
|
||||
}));
|
||||
(Some(logical_ast), errors)
|
||||
}
|
||||
UserInputLeaf::Exists { .. } => (
|
||||
None,
|
||||
vec![QueryParserError::UnsupportedQuery(
|
||||
"Range query need to target a specific field.".to_string(),
|
||||
)],
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::schema::Field;
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
/// {
|
||||
/// let mut index_writer = index.writer(3_000_000)?;
|
||||
/// let mut index_writer = index.writer(15_000_000)?;
|
||||
/// index_writer.add_document(doc!(
|
||||
/// title => "The Name of the Wind",
|
||||
/// ))?;
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::Term;
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
/// {
|
||||
/// let mut index_writer = index.writer(3_000_000)?;
|
||||
/// let mut index_writer = index.writer(15_000_000)?;
|
||||
/// index_writer.add_document(doc!(
|
||||
/// title => "The Name of the Wind",
|
||||
/// ))?;
|
||||
@@ -151,7 +151,7 @@ mod tests {
|
||||
let ip_addr_2 = Ipv6Addr::from_u128(10);
|
||||
|
||||
{
|
||||
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_for_tests().unwrap();
|
||||
index_writer
|
||||
.add_document(doc!(
|
||||
ip_field => ip_addr_1
|
||||
|
||||
@@ -179,6 +179,7 @@ mod tests {
|
||||
use super::Warmer;
|
||||
use crate::core::searcher::SearcherGeneration;
|
||||
use crate::directory::RamDirectory;
|
||||
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
|
||||
use crate::schema::{Schema, INDEXED};
|
||||
use crate::{Index, IndexSettings, ReloadPolicy, Searcher, SegmentId};
|
||||
|
||||
@@ -255,7 +256,10 @@ mod tests {
|
||||
|
||||
let num_writer_threads = 4;
|
||||
let mut writer = index
|
||||
.writer_with_num_threads(num_writer_threads, 25_000_000)
|
||||
.writer_with_num_threads(
|
||||
num_writer_threads,
|
||||
MEMORY_BUDGET_NUM_BYTES_MIN * num_writer_threads,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for i in 0u64..1000u64 {
|
||||
|
||||
@@ -397,20 +397,29 @@ where B: AsRef<[u8]>
|
||||
Some(Ipv6Addr::from_u128(ip_u128))
|
||||
}
|
||||
|
||||
/// Returns the json path (without non-human friendly separators),
|
||||
/// Returns the json path type.
|
||||
///
|
||||
/// Returns `None` if the value is not JSON.
|
||||
pub fn json_path_type(&self) -> Option<Type> {
|
||||
let json_value_bytes = self.as_json_value_bytes()?;
|
||||
|
||||
Some(json_value_bytes.typ())
|
||||
}
|
||||
|
||||
/// Returns the json path bytes (including the JSON_END_OF_PATH byte),
|
||||
/// and the encoded ValueBytes after the json path.
|
||||
///
|
||||
/// Returns `None` if the value is not JSON.
|
||||
pub(crate) fn as_json(&self) -> Option<(&str, ValueBytes<&[u8]>)> {
|
||||
pub(crate) fn as_json(&self) -> Option<(&[u8], ValueBytes<&[u8]>)> {
|
||||
if self.typ() != Type::Json {
|
||||
return None;
|
||||
}
|
||||
let bytes = self.value_bytes();
|
||||
|
||||
let pos = bytes.iter().cloned().position(|b| b == JSON_END_OF_PATH)?;
|
||||
let (json_path_bytes, term) = bytes.split_at(pos);
|
||||
let json_path = str::from_utf8(json_path_bytes).ok()?;
|
||||
Some((json_path, ValueBytes::wrap(&term[1..])))
|
||||
// split at pos + 1, so that json_path_bytes includes the JSON_END_OF_PATH byte.
|
||||
let (json_path_bytes, term) = bytes.split_at(pos + 1);
|
||||
Some((json_path_bytes, ValueBytes::wrap(&term)))
|
||||
}
|
||||
|
||||
/// Returns the encoded ValueBytes after the json path.
|
||||
@@ -469,7 +478,10 @@ where B: AsRef<[u8]>
|
||||
write_opt(f, self.as_bytes())?;
|
||||
}
|
||||
Type::Json => {
|
||||
if let Some((path, sub_value_bytes)) = self.as_json() {
|
||||
if let Some((path_bytes, sub_value_bytes)) = self.as_json() {
|
||||
// Remove the JSON_END_OF_PATH byte & convert to utf8.
|
||||
let path = str::from_utf8(&path_bytes[..path_bytes.len() - 1])
|
||||
.map_err(|_| std::fmt::Error)?;
|
||||
let path_pretty = path.replace(JSON_PATH_SEGMENT_SEP_STR, ".");
|
||||
write!(f, "path={path_pretty}, ")?;
|
||||
sub_value_bytes.debug_value_bytes(f)?;
|
||||
|
||||
@@ -45,7 +45,7 @@ fn test_write_commit_fails() -> tantivy::Result<()> {
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?;
|
||||
let mut index_writer = index.writer_with_num_threads(1, 15_000_000)?;
|
||||
for _ in 0..100 {
|
||||
index_writer.add_document(doc!(text_field => "a"))?;
|
||||
}
|
||||
@@ -75,7 +75,7 @@ fn test_fail_on_flush_segment() -> tantivy::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let index_writer = index.writer_with_num_threads(1, 3_000_000)?;
|
||||
let index_writer = index.writer_with_num_threads(1, 15_000_000)?;
|
||||
fail::cfg("FieldSerializer::close_term", "return(simulatederror)").unwrap();
|
||||
for i in 0..100_000 {
|
||||
if index_writer
|
||||
@@ -94,7 +94,7 @@ fn test_fail_on_flush_segment_but_one_worker_remains() -> tantivy::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let index_writer = index.writer_with_num_threads(2, 6_000_000)?;
|
||||
let index_writer = index.writer_with_num_threads(2, 30_000_000)?;
|
||||
fail::cfg("FieldSerializer::close_term", "1*return(simulatederror)").unwrap();
|
||||
for i in 0..100_000 {
|
||||
if index_writer
|
||||
@@ -113,7 +113,7 @@ fn test_fail_on_commit_segment() -> tantivy::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?;
|
||||
let mut index_writer = index.writer_with_num_threads(1, 15_000_000)?;
|
||||
fail::cfg("FieldSerializer::close_term", "return(simulatederror)").unwrap();
|
||||
for i in 0..10 {
|
||||
index_writer
|
||||
|
||||
Reference in New Issue
Block a user