Compare commits

...

10 Commits

Author SHA1 Message Date
Pascal Seitz
dc90e4f8e9 change Err from Debug to Display
as reported by a user on Discord, the output of the Debug on error is a useless: 'Any { .. }'
switch to `Display` via to_string
2023-09-27 11:02:53 +08:00
PSeitz
34920d31f5 Fix DateHistogram bucket gap (#2183)
* Fix DateHistogram bucket gap

Fixes a computation issue of the number of buckets needed in the
DateHistogram.

This is due to a missing normalization from request values (ms) to fast field
values (ns), when converting an intermediate result to the final result.
This results in a wrong computation by a factor 1_000_000.
The Histogram normalizes values to nanoseconds, to make the user input like
extended_bounds (ms precision) and the values from the fast field (ns precision for date type) compatible.
This normalization happens only for date type fields, as other field types don't have precision settings.
The normalization does not happen due a missing `column_type`, which is not
correctly passed after merging an empty aggregation (which does not have a `column_type` set), with a regular aggregation.

Another related issue is an empty aggregation, which will not have
`column_type` set, will not convert the result to human readable format.

This PR fixes the issue by:
- Limit the allowed field types of DateHistogram to DateType
- Instead of passing the column_type, which is only available on the segment level, we flag the aggregation as `is_date_agg`.
- Fix the merge logic

Add a flag to to normalization only once. This is not an issue
currently, but it could become easily one.

closes https://github.com/quickwit-oss/quickwit/issues/3837

* use older nightly for time crate (breaks build)
2023-09-21 10:41:35 +02:00
trinity-1686a
0241a05b90 add support for exists query syntax in query parser (#2170)
* add support for exists query syntax in query parser

* rustfmt

* make Exists require a field
2023-09-19 11:10:39 +02:00
PSeitz
e125f3b041 fix test (#2178) 2023-09-19 08:21:50 +02:00
PSeitz
c520ac46fc add support for date in term agg (#2172)
support DateTime in TermsAggregation
Format dates with Rfc3339
2023-09-14 09:22:18 +02:00
PSeitz
2d7390341c increase min memory to 15MB for indexing (#2176)
With tantivy 0.20 the minimum memory consumption per SegmentWriter increased to
12MB. 7MB are for the different fast field collectors types (they could be
lazily created). Increase the minimum memory from 3MB to 15MB.

Change memory variable naming from arena to budget.

closes #2156
2023-09-13 07:38:34 +02:00
dependabot[bot]
03fcdce016 Bump actions/checkout from 3 to 4 (#2171)
Bumps [actions/checkout](https://github.com/actions/checkout) from 3 to 4.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v3...v4)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-11 10:47:33 +02:00
Ping Xia
e4e416ac42 extend FuzzyTermQuery to support json field (#2173)
* extend fuzzy search for json field

* comments

* comments

* fmt fix

* comments
2023-09-11 05:59:40 +02:00
Igor Motov
19325132b7 Fast-field based implementation of ExistsQuery (#2160)
Adds an implementation of ExistsQuery that takes advantage of fast fields.

Fixes #2159
2023-09-07 11:51:49 +09:00
Paul Masurel
389d36f760 Added comments 2023-09-04 11:06:56 +09:00
38 changed files with 1094 additions and 204 deletions

View File

@@ -15,13 +15,13 @@ jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup toolchain install nightly --profile minimal --component llvm-tools-preview
run: rustup toolchain install nightly-2023-09-10 --profile minimal --component llvm-tools-preview
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
run: cargo +nightly-2023-09-10 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
continue-on-error: true

View File

@@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install stable
uses: actions-rs/toolchain@v1
with:

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install nightly
uses: actions-rs/toolchain@v1
@@ -60,7 +60,7 @@ jobs:
name: test-${{ matrix.features.label}}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install stable
uses: actions-rs/toolchain@v1

View File

@@ -128,7 +128,7 @@ members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sst
[[test]]
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
required-features = ["failpoints"]
[[bench]]
name = "analyzer"

View File

@@ -24,7 +24,7 @@ const SPECIAL_CHARS: &[char] = &[
/// consume a field name followed by colon. Return the field name with escape sequence
/// already interpreted
fn field_name(i: &str) -> IResult<&str, String> {
fn field_name(inp: &str) -> IResult<&str, String> {
let simple_char = none_of(SPECIAL_CHARS);
let first_char = verify(none_of(SPECIAL_CHARS), |c| *c != '-');
let escape_sequence = || preceded(char('\\'), one_of(SPECIAL_CHARS));
@@ -38,12 +38,12 @@ fn field_name(i: &str) -> IResult<&str, String> {
char(':'),
),
|(first_char, next)| once(first_char).chain(next).collect(),
)(i)
)(inp)
}
/// Consume a word outside of any context.
// TODO should support escape sequences
fn word(i: &str) -> IResult<&str, &str> {
fn word(inp: &str) -> IResult<&str, &str> {
map_res(
recognize(tuple((
satisfy(|c| {
@@ -55,14 +55,14 @@ fn word(i: &str) -> IResult<&str, &str> {
})),
))),
|s| match s {
"OR" | "AND" | "NOT" | "IN" => Err(Error::new(i, ErrorKind::Tag)),
"OR" | "AND" | "NOT" | "IN" => Err(Error::new(inp, ErrorKind::Tag)),
_ => Ok(s),
},
)(i)
)(inp)
}
fn word_infallible(delimiter: &str) -> impl Fn(&str) -> JResult<&str, Option<&str>> + '_ {
|i| {
|inp| {
opt_i_err(
preceded(
space0,
@@ -71,29 +71,29 @@ fn word_infallible(delimiter: &str) -> impl Fn(&str) -> JResult<&str, Option<&st
}))),
),
"expected word",
)(i)
)(inp)
}
}
/// Consume a word inside a Range context. More values are allowed as they are
/// not ambiguous in this context.
fn relaxed_word(i: &str) -> IResult<&str, &str> {
fn relaxed_word(inp: &str) -> IResult<&str, &str> {
recognize(tuple((
satisfy(|c| !c.is_whitespace() && !['`', '{', '}', '"', '[', ']', '(', ')'].contains(&c)),
many0(satisfy(|c: char| {
!c.is_whitespace() && !['{', '}', '"', '[', ']', '(', ')'].contains(&c)
})),
)))(i)
)))(inp)
}
fn negative_number(i: &str) -> IResult<&str, &str> {
fn negative_number(inp: &str) -> IResult<&str, &str> {
recognize(preceded(
char('-'),
tuple((digit1, opt(tuple((char('.'), digit1))))),
))(i)
))(inp)
}
fn simple_term(i: &str) -> IResult<&str, (Delimiter, String)> {
fn simple_term(inp: &str) -> IResult<&str, (Delimiter, String)> {
let escaped_string = |delimiter| {
// we need this because none_of can't accept an owned array of char.
let not_delimiter = verify(anychar, move |parsed| *parsed != delimiter);
@@ -123,13 +123,13 @@ fn simple_term(i: &str) -> IResult<&str, (Delimiter, String)> {
simple_quotes,
double_quotes,
text_no_delimiter,
))(i)
))(inp)
}
fn simple_term_infallible(
delimiter: &str,
) -> impl Fn(&str) -> JResult<&str, Option<(Delimiter, String)>> + '_ {
|i| {
|inp| {
let escaped_string = |delimiter| {
// we need this because none_of can't accept an owned array of char.
let not_delimiter = verify(anychar, move |parsed| *parsed != delimiter);
@@ -162,11 +162,11 @@ fn simple_term_infallible(
map(word_infallible(delimiter), |(text, errors)| {
(text.map(|text| (Delimiter::None, text.to_string())), errors)
}),
)(i)
)(inp)
}
}
fn term_or_phrase(i: &str) -> IResult<&str, UserInputLeaf> {
fn term_or_phrase(inp: &str) -> IResult<&str, UserInputLeaf> {
map(
tuple((simple_term, fallible(slop_or_prefix_val))),
|((delimiter, phrase), (slop, prefix))| {
@@ -179,10 +179,10 @@ fn term_or_phrase(i: &str) -> IResult<&str, UserInputLeaf> {
}
.into()
},
)(i)
)(inp)
}
fn term_or_phrase_infallible(i: &str) -> JResult<&str, Option<UserInputLeaf>> {
fn term_or_phrase_infallible(inp: &str) -> JResult<&str, Option<UserInputLeaf>> {
map(
// ~* for slop/prefix, ) inside group or ast tree, ^ if boost
tuple_infallible((simple_term_infallible("*)^"), slop_or_prefix_val)),
@@ -214,10 +214,10 @@ fn term_or_phrase_infallible(i: &str) -> JResult<&str, Option<UserInputLeaf>> {
};
(leaf, errors)
},
)(i)
)(inp)
}
fn term_group(i: &str) -> IResult<&str, UserInputAst> {
fn term_group(inp: &str) -> IResult<&str, UserInputAst> {
let occur_symbol = alt((
value(Occur::MustNot, char('-')),
value(Occur::Must, char('+')),
@@ -240,12 +240,12 @@ fn term_group(i: &str) -> IResult<&str, UserInputAst> {
.collect(),
)
},
)(i)
)(inp)
}
// this is a precondition for term_group_infallible. Without it, term_group_infallible can fail
// with a panic. It does not consume its input.
fn term_group_precond(i: &str) -> IResult<&str, (), ()> {
fn term_group_precond(inp: &str) -> IResult<&str, (), ()> {
value(
(),
peek(tuple((
@@ -253,13 +253,13 @@ fn term_group_precond(i: &str) -> IResult<&str, (), ()> {
space0,
char('('), // when we are here, we know it can't be anything but a term group
))),
)(i)
)(inp)
.map_err(|e| e.map(|_| ()))
}
fn term_group_infallible(i: &str) -> JResult<&str, UserInputAst> {
let (mut i, (field_name, _, _, _)) =
tuple((field_name, space0, char('('), space0))(i).expect("precondition failed");
fn term_group_infallible(inp: &str) -> JResult<&str, UserInputAst> {
let (mut inp, (field_name, _, _, _)) =
tuple((field_name, space0, char('('), space0))(inp).expect("precondition failed");
let mut terms = Vec::new();
let mut errs = Vec::new();
@@ -270,19 +270,19 @@ fn term_group_infallible(i: &str) -> JResult<&str, UserInputAst> {
first_round = false;
Vec::new()
} else {
let (rest, (_, err)) = space1_infallible(i)?;
i = rest;
let (rest, (_, err)) = space1_infallible(inp)?;
inp = rest;
err
};
if i.is_empty() {
if inp.is_empty() {
errs.push(LenientErrorInternal {
pos: i.len(),
pos: inp.len(),
message: "missing )".to_string(),
});
break Ok((i, (UserInputAst::Clause(terms), errs)));
break Ok((inp, (UserInputAst::Clause(terms), errs)));
}
if let Some(i) = i.strip_prefix(')') {
break Ok((i, (UserInputAst::Clause(terms), errs)));
if let Some(inp) = inp.strip_prefix(')') {
break Ok((inp, (UserInputAst::Clause(terms), errs)));
}
// only append missing space error if we did not reach the end of group
errs.append(&mut space_error);
@@ -291,26 +291,57 @@ fn term_group_infallible(i: &str) -> JResult<&str, UserInputAst> {
// first byte is not `)` or ' '. If it did not, we would end up looping.
let (rest, ((occur, leaf), mut err)) =
tuple_infallible((occur_symbol, term_or_phrase_infallible))(i)?;
tuple_infallible((occur_symbol, term_or_phrase_infallible))(inp)?;
errs.append(&mut err);
if let Some(leaf) = leaf {
terms.push((occur, leaf.set_field(Some(field_name.clone())).into()));
}
i = rest;
inp = rest;
}
}
fn literal(i: &str) -> IResult<&str, UserInputAst> {
fn exists(inp: &str) -> IResult<&str, UserInputLeaf> {
value(
UserInputLeaf::Exists {
field: String::new(),
},
tuple((space0, char('*'))),
)(inp)
}
fn exists_precond(inp: &str) -> IResult<&str, (), ()> {
value(
(),
peek(tuple((
field_name,
space0,
char('*'), // when we are here, we know it can't be anything but a exists
))),
)(inp)
.map_err(|e| e.map(|_| ()))
}
fn exists_infallible(inp: &str) -> JResult<&str, UserInputAst> {
let (inp, (field_name, _, _)) =
tuple((field_name, space0, char('*')))(inp).expect("precondition failed");
let exists = UserInputLeaf::Exists { field: field_name }.into();
Ok((inp, (exists, Vec::new())))
}
fn literal(inp: &str) -> IResult<&str, UserInputAst> {
// * alone is already parsed by our caller, so if `exists` succeed, we can be confident
// something (a field name) got parsed before
alt((
map(
tuple((opt(field_name), alt((range, set, term_or_phrase)))),
tuple((opt(field_name), alt((range, set, exists, term_or_phrase)))),
|(field_name, leaf): (Option<String>, UserInputLeaf)| leaf.set_field(field_name).into(),
),
term_group,
))(i)
))(inp)
}
fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
fn literal_no_group_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
map(
tuple_infallible((
opt_i(field_name),
@@ -337,7 +368,7 @@ fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
&& field_name.is_none()
{
errors.push(LenientErrorInternal {
pos: i.len(),
pos: inp.len(),
message: "parsed possible invalid field as term".to_string(),
});
}
@@ -346,7 +377,7 @@ fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
&& field_name.is_none()
{
errors.push(LenientErrorInternal {
pos: i.len(),
pos: inp.len(),
message: "parsed keyword NOT as term. It should be quoted".to_string(),
});
}
@@ -355,34 +386,40 @@ fn literal_no_group_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
errors,
)
},
)(i)
)(inp)
}
fn literal_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
fn literal_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
alt_infallible(
((
term_group_precond,
map(term_group_infallible, |(group, errs)| (Some(group), errs)),
),),
(
(
term_group_precond,
map(term_group_infallible, |(group, errs)| (Some(group), errs)),
),
(
exists_precond,
map(exists_infallible, |(exists, errs)| (Some(exists), errs)),
),
),
literal_no_group_infallible,
)(i)
)(inp)
}
fn slop_or_prefix_val(i: &str) -> JResult<&str, (u32, bool)> {
fn slop_or_prefix_val(inp: &str) -> JResult<&str, (u32, bool)> {
map(
opt_i(alt((
value((0, true), char('*')),
map(preceded(char('~'), u32), |slop| (slop, false)),
))),
|(slop_or_prefix_opt, err)| (slop_or_prefix_opt.unwrap_or_default(), err),
)(i)
)(inp)
}
/// Function that parses a range out of a Stream
/// Supports ranges like:
/// [5 TO 10], {5 TO 10}, [* TO 10], [10 TO *], {10 TO *], >5, <=10
/// [a TO *], [a TO c], [abc TO bcd}
fn range(i: &str) -> IResult<&str, UserInputLeaf> {
fn range(inp: &str) -> IResult<&str, UserInputLeaf> {
let range_term_val = || {
map(
alt((negative_number, relaxed_word, tag("*"))),
@@ -442,10 +479,10 @@ fn range(i: &str) -> IResult<&str, UserInputLeaf> {
lower,
upper,
},
)(i)
)(inp)
}
fn range_infallible(i: &str) -> JResult<&str, UserInputLeaf> {
fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
let lower_to_upper = map(
tuple_infallible((
opt_i(anychar),
@@ -553,10 +590,10 @@ fn range_infallible(i: &str) -> JResult<&str, UserInputLeaf> {
errors,
)
},
)(i)
)(inp)
}
fn set(i: &str) -> IResult<&str, UserInputLeaf> {
fn set(inp: &str) -> IResult<&str, UserInputLeaf> {
map(
preceded(
tuple((space0, tag("IN"), space1)),
@@ -570,10 +607,10 @@ fn set(i: &str) -> IResult<&str, UserInputLeaf> {
field: None,
elements,
},
)(i)
)(inp)
}
fn set_infallible(mut i: &str) -> JResult<&str, UserInputLeaf> {
fn set_infallible(mut inp: &str) -> JResult<&str, UserInputLeaf> {
// `IN [` has already been parsed when we enter, we only need to parse simple terms until we
// find a `]`
let mut elements = Vec::new();
@@ -584,41 +621,41 @@ fn set_infallible(mut i: &str) -> JResult<&str, UserInputLeaf> {
first_round = false;
Vec::new()
} else {
let (rest, (_, err)) = space1_infallible(i)?;
i = rest;
let (rest, (_, err)) = space1_infallible(inp)?;
inp = rest;
err
};
if i.is_empty() {
if inp.is_empty() {
// TODO push error about missing ]
//
errs.push(LenientErrorInternal {
pos: i.len(),
pos: inp.len(),
message: "missing ]".to_string(),
});
let res = UserInputLeaf::Set {
field: None,
elements,
};
return Ok((i, (res, errs)));
return Ok((inp, (res, errs)));
}
if let Some(i) = i.strip_prefix(']') {
if let Some(inp) = inp.strip_prefix(']') {
let res = UserInputLeaf::Set {
field: None,
elements,
};
return Ok((i, (res, errs)));
return Ok((inp, (res, errs)));
}
errs.append(&mut space_error);
// TODO
// here we do the assumption term_or_phrase_infallible always consume something if the
// first byte is not `)` or ' '. If it did not, we would end up looping.
let (rest, (delim_term, mut err)) = simple_term_infallible("]")(i)?;
let (rest, (delim_term, mut err)) = simple_term_infallible("]")(inp)?;
errs.append(&mut err);
if let Some((_, term)) = delim_term {
elements.push(term);
}
i = rest;
inp = rest;
}
}
@@ -626,16 +663,16 @@ fn negate(expr: UserInputAst) -> UserInputAst {
expr.unary(Occur::MustNot)
}
fn leaf(i: &str) -> IResult<&str, UserInputAst> {
fn leaf(inp: &str) -> IResult<&str, UserInputAst> {
alt((
delimited(char('('), ast, char(')')),
map(char('*'), |_| UserInputAst::from(UserInputLeaf::All)),
map(preceded(tuple((tag("NOT"), space1)), leaf), negate),
literal,
))(i)
))(inp)
}
fn leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
fn leaf_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
alt_infallible(
(
(
@@ -665,23 +702,23 @@ fn leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
),
),
literal_infallible,
)(i)
)(inp)
}
fn positive_float_number(i: &str) -> IResult<&str, f64> {
fn positive_float_number(inp: &str) -> IResult<&str, f64> {
map(
recognize(tuple((digit1, opt(tuple((char('.'), digit1)))))),
// TODO this is actually dangerous if the number is actually not representable as a f64
// (too big for instance)
|float_str: &str| float_str.parse::<f64>().unwrap(),
)(i)
)(inp)
}
fn boost(i: &str) -> JResult<&str, Option<f64>> {
opt_i(preceded(char('^'), positive_float_number))(i)
fn boost(inp: &str) -> JResult<&str, Option<f64>> {
opt_i(preceded(char('^'), positive_float_number))(inp)
}
fn boosted_leaf(i: &str) -> IResult<&str, UserInputAst> {
fn boosted_leaf(inp: &str) -> IResult<&str, UserInputAst> {
map(
tuple((leaf, fallible(boost))),
|(leaf, boost_opt)| match boost_opt {
@@ -690,10 +727,10 @@ fn boosted_leaf(i: &str) -> IResult<&str, UserInputAst> {
}
_ => leaf,
},
)(i)
)(inp)
}
fn boosted_leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
fn boosted_leaf_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
map(
tuple_infallible((leaf_infallible, boost)),
|((leaf, boost_opt), error)| match boost_opt {
@@ -703,30 +740,30 @@ fn boosted_leaf_infallible(i: &str) -> JResult<&str, Option<UserInputAst>> {
),
_ => (leaf, error),
},
)(i)
)(inp)
}
fn occur_symbol(i: &str) -> JResult<&str, Option<Occur>> {
fn occur_symbol(inp: &str) -> JResult<&str, Option<Occur>> {
opt_i(alt((
value(Occur::MustNot, char('-')),
value(Occur::Must, char('+')),
)))(i)
)))(inp)
}
fn occur_leaf(i: &str) -> IResult<&str, (Option<Occur>, UserInputAst)> {
tuple((fallible(occur_symbol), boosted_leaf))(i)
fn occur_leaf(inp: &str) -> IResult<&str, (Option<Occur>, UserInputAst)> {
tuple((fallible(occur_symbol), boosted_leaf))(inp)
}
#[allow(clippy::type_complexity)]
fn operand_occur_leaf_infallible(
i: &str,
inp: &str,
) -> JResult<&str, (Option<BinaryOperand>, Option<Occur>, Option<UserInputAst>)> {
// TODO maybe this should support multiple chained AND/OR, and "fuse" them?
tuple_infallible((
delimited_infallible(nothing, opt_i(binary_operand), space0_infallible),
occur_symbol,
boosted_leaf_infallible,
))(i)
))(inp)
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -735,11 +772,11 @@ enum BinaryOperand {
And,
}
fn binary_operand(i: &str) -> IResult<&str, BinaryOperand> {
fn binary_operand(inp: &str) -> IResult<&str, BinaryOperand> {
alt((
value(BinaryOperand::And, tag("AND ")),
value(BinaryOperand::Or, tag("OR ")),
))(i)
))(inp)
}
fn aggregate_binary_expressions(
@@ -880,14 +917,14 @@ fn aggregate_infallible_expressions(
}
}
fn operand_leaf(i: &str) -> IResult<&str, (BinaryOperand, UserInputAst)> {
fn operand_leaf(inp: &str) -> IResult<&str, (BinaryOperand, UserInputAst)> {
tuple((
terminated(binary_operand, space0),
terminated(boosted_leaf, space0),
))(i)
))(inp)
}
fn ast(i: &str) -> IResult<&str, UserInputAst> {
fn ast(inp: &str) -> IResult<&str, UserInputAst> {
let boolean_expr = map(
separated_pair(boosted_leaf, space1, many1(operand_leaf)),
|(left, right)| aggregate_binary_expressions(left, right),
@@ -908,10 +945,10 @@ fn ast(i: &str) -> IResult<&str, UserInputAst> {
space0,
alt((boolean_expr, whitespace_separated_leaves)),
space0,
)(i)
)(inp)
}
fn ast_infallible(i: &str) -> JResult<&str, UserInputAst> {
fn ast_infallible(inp: &str) -> JResult<&str, UserInputAst> {
// ast() parse either `term AND term OR term` or `+term term -term`
// both are locally ambiguous, and as we allow error, it's hard to permit backtracking.
// Instead, we allow a mix of both syntaxes, trying to make sense of what a user meant.
@@ -928,13 +965,13 @@ fn ast_infallible(i: &str) -> JResult<&str, UserInputAst> {
},
);
delimited_infallible(space0_infallible, expression, space0_infallible)(i)
delimited_infallible(space0_infallible, expression, space0_infallible)(inp)
}
pub fn parse_to_ast(i: &str) -> IResult<&str, UserInputAst> {
pub fn parse_to_ast(inp: &str) -> IResult<&str, UserInputAst> {
map(delimited(space0, opt(ast), eof), |opt_ast| {
rewrite_ast(opt_ast.unwrap_or_else(UserInputAst::empty_query))
})(i)
})(inp)
}
pub fn parse_to_ast_lenient(query_str: &str) -> (UserInputAst, Vec<LenientError>) {
@@ -1538,6 +1575,17 @@ mod test {
test_parse_query_to_ast_helper("foo:\"\"*", "\"foo\":\"\"*");
}
#[test]
fn test_exist_query() {
test_parse_query_to_ast_helper("a:*", "\"a\":*");
test_parse_query_to_ast_helper("a: *", "\"a\":*");
// an exist followed by default term being b
test_is_parse_err("a:*b", "(*\"a\":* *b)");
// this is a term query (not a phrase prefix)
test_parse_query_to_ast_helper("a:b*", "\"a\":b*");
}
#[test]
fn test_not_queries_are_consistent() {
test_parse_query_to_ast_helper("tata -toto", "(*tata -toto)");

View File

@@ -16,6 +16,9 @@ pub enum UserInputLeaf {
field: Option<String>,
elements: Vec<String>,
},
Exists {
field: String,
},
}
impl UserInputLeaf {
@@ -36,6 +39,9 @@ impl UserInputLeaf {
upper,
},
UserInputLeaf::Set { field: _, elements } => UserInputLeaf::Set { field, elements },
UserInputLeaf::Exists { field: _ } => UserInputLeaf::Exists {
field: field.expect("Exist query without a field isn't allowed"),
},
}
}
}
@@ -74,6 +80,9 @@ impl Debug for UserInputLeaf {
write!(formatter, "]")
}
UserInputLeaf::All => write!(formatter, "*"),
UserInputLeaf::Exists { field } => {
write!(formatter, "\"{field}\":*")
}
}
}
}

View File

@@ -134,3 +134,142 @@ impl Drop for ResourceLimitGuard {
.fetch_sub(self.allocated_with_the_guard, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use crate::aggregation::tests::exec_request_with_query;
// https://github.com/quickwit-oss/quickwit/issues/3837
#[test]
fn test_agg_limits_with_empty_merge() {
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::bucket::tests::get_test_index_from_docs;
let docs = vec![
vec![r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb", "text2": "bbb" }"#],
vec![r#"{ "text": "aaa", "text2": "bbb" }"#],
];
let index = get_test_index_from_docs(false, &docs).unwrap();
{
let elasticsearch_compatible_json = json!(
{
"1": {
"terms": {"field": "text2", "min_doc_count": 0},
"aggs": {
"2":{
"date_histogram": {
"field": "date",
"fixed_interval": "1d",
"extended_bounds": {
"min": "2015-01-01T00:00:00Z",
"max": "2015-01-10T00:00:00Z"
}
}
}
}
}
}
);
let agg_req: Aggregations = serde_json::from_str(
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
)
.unwrap();
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
let expected_res = json!({
"1": {
"buckets": [
{
"2": {
"buckets": [
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
{ "doc_count": 1, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
]
},
"doc_count": 1,
"key": "bbb"
}
],
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0
}
});
assert_eq!(res, expected_res);
}
}
// https://github.com/quickwit-oss/quickwit/issues/3837
#[test]
fn test_agg_limits_with_empty_data() {
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::bucket::tests::get_test_index_from_docs;
let docs = vec![vec![r#"{ "text": "aaa", "text2": "bbb" }"#]];
let index = get_test_index_from_docs(false, &docs).unwrap();
{
// Empty result since there is no doc with dates
let elasticsearch_compatible_json = json!(
{
"1": {
"terms": {"field": "text2", "min_doc_count": 0},
"aggs": {
"2":{
"date_histogram": {
"field": "date",
"fixed_interval": "1d",
"extended_bounds": {
"min": "2015-01-01T00:00:00Z",
"max": "2015-01-10T00:00:00Z"
}
}
}
}
}
}
);
let agg_req: Aggregations = serde_json::from_str(
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
)
.unwrap();
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
let expected_res = json!({
"1": {
"buckets": [
{
"2": {
"buckets": [
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
{ "doc_count": 0, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
]
},
"doc_count": 0,
"key": "bbb"
}
],
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0
}
});
assert_eq!(res, expected_res);
}
}
}

View File

@@ -103,7 +103,8 @@ impl AggregationWithAccessor {
field: field_name, ..
}) => {
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
// Only DateTime is supported for DateHistogram
get_ff_reader(reader, field_name, Some(&[ColumnType::DateTime]))?;
add_agg_with_accessor(accessor, column_type, &mut res)?;
}
Terms(TermsAggregation {
@@ -117,10 +118,10 @@ impl AggregationWithAccessor {
ColumnType::U64,
ColumnType::F64,
ColumnType::Str,
ColumnType::DateTime,
// ColumnType::Bytes Unsupported
// ColumnType::Bool Unsupported
// ColumnType::IpAddr Unsupported
// ColumnType::DateTime Unsupported
];
// In case the column is empty we want the shim column to match the missing type
@@ -145,7 +146,18 @@ impl AggregationWithAccessor {
.map(|m| matches!(m, Key::Str(_)))
.unwrap_or(false);
let use_special_missing_agg = missing_and_more_than_one_col || text_on_non_text_col;
// Actually we could convert the text to a number and have the fast path, if it is
// provided in Rfc3339 format. But this use case is probably common
// enough to justify the effort.
let text_on_date_col = column_and_types.len() == 1
&& column_and_types[0].1 == ColumnType::DateTime
&& missing
.as_ref()
.map(|m| matches!(m, Key::Str(_)))
.unwrap_or(false);
let use_special_missing_agg =
missing_and_more_than_one_col || text_on_non_text_col || text_on_date_col;
if use_special_missing_agg {
let column_and_types =
get_all_ff_reader_or_empty(reader, field_name, None, fallback_type)?;

View File

@@ -132,6 +132,7 @@ impl DateHistogramAggregationReq {
hard_bounds: self.hard_bounds,
extended_bounds: self.extended_bounds,
keyed: self.keyed,
is_normalized_to_ns: false,
})
}
@@ -243,14 +244,14 @@ fn parse_into_milliseconds(input: &str) -> Result<i64, AggregationError> {
}
#[cfg(test)]
mod tests {
pub mod tests {
use pretty_assertions::assert_eq;
use super::*;
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::tests::exec_request;
use crate::indexer::NoMergePolicy;
use crate::schema::{Schema, FAST};
use crate::schema::{Schema, FAST, STRING};
use crate::Index;
#[test]
@@ -306,7 +307,8 @@ mod tests {
) -> crate::Result<Index> {
let mut schema_builder = Schema::builder();
schema_builder.add_date_field("date", FAST);
schema_builder.add_text_field("text", FAST);
schema_builder.add_text_field("text", FAST | STRING);
schema_builder.add_text_field("text2", FAST | STRING);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
{

View File

@@ -122,11 +122,14 @@ pub struct HistogramAggregation {
/// Whether to return the buckets as a hash map
#[serde(default)]
pub keyed: bool,
/// Whether the values are normalized to ns for date time values. Defaults to false.
#[serde(default)]
pub is_normalized_to_ns: bool,
}
impl HistogramAggregation {
pub(crate) fn normalize(&mut self, column_type: ColumnType) {
if column_type.is_date_time() {
pub(crate) fn normalize_date_time(&mut self) {
if !self.is_normalized_to_ns {
// values are provided in ms, but the fastfield is in nano seconds
self.interval *= 1_000_000.0;
self.offset = self.offset.map(|off| off * 1_000_000.0);
@@ -138,6 +141,7 @@ impl HistogramAggregation {
min: bounds.min * 1_000_000.0,
max: bounds.max * 1_000_000.0,
});
self.is_normalized_to_ns = true;
}
}
@@ -370,7 +374,7 @@ impl SegmentHistogramCollector {
Ok(IntermediateBucketResult::Histogram {
buckets,
column_type: Some(self.column_type),
is_date_agg: self.column_type == ColumnType::DateTime,
})
}
@@ -381,7 +385,9 @@ impl SegmentHistogramCollector {
accessor_idx: usize,
) -> crate::Result<Self> {
req.validate()?;
req.normalize(field_type);
if field_type == ColumnType::DateTime {
req.normalize_date_time();
}
let sub_aggregation_blueprint = if sub_aggregation.is_empty() {
None
@@ -439,6 +445,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
// memory check upfront
let (_, first_bucket_num, last_bucket_num) =
generate_bucket_pos_with_opt_minmax(histogram_req, min_max);
// It's based on user input, so we need to account for overflows
let added_buckets = ((last_bucket_num.saturating_sub(first_bucket_num)).max(0) as u64)
.saturating_sub(buckets.len() as u64);
@@ -482,7 +489,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
// Convert to BucketEntry
pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
buckets: Vec<IntermediateHistogramBucketEntry>,
column_type: Option<ColumnType>,
is_date_agg: bool,
histogram_req: &HistogramAggregation,
sub_aggregation: &Aggregations,
limits: &AggregationLimits,
@@ -491,8 +498,8 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
// The request used in the the call to final is not yet be normalized.
// Normalization is changing the precision from milliseconds to nanoseconds.
let mut histogram_req = histogram_req.clone();
if let Some(column_type) = column_type {
histogram_req.normalize(column_type);
if is_date_agg {
histogram_req.normalize_date_time();
}
let mut buckets = if histogram_req.min_doc_count() == 0 {
// With min_doc_count != 0, we may need to add buckets, so that there are no
@@ -516,7 +523,7 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
// If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339
// and normalize from nanoseconds to milliseconds
if column_type == Some(ColumnType::DateTime) {
if is_date_agg {
for bucket in buckets.iter_mut() {
if let crate::aggregation::Key::F64(ref mut val) = bucket.key {
let key_as_string = format_date(*val as i64)?;

View File

@@ -1,6 +1,6 @@
use std::fmt::Debug;
use columnar::{BytesColumn, ColumnType, StrColumn};
use columnar::{BytesColumn, ColumnType, MonotonicallyMappableToU64, StrColumn};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
@@ -16,7 +16,7 @@ use crate::aggregation::intermediate_agg_result::{
use crate::aggregation::segment_agg_result::{
build_segment_agg_collector, SegmentAggregationCollector,
};
use crate::aggregation::{f64_from_fastfield_u64, Key};
use crate::aggregation::{f64_from_fastfield_u64, format_date, Key};
use crate::error::DataCorruption;
use crate::TantivyError;
@@ -531,6 +531,13 @@ impl SegmentTermCollector {
});
}
}
} else if self.field_type == ColumnType::DateTime {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
let val = i64::from_u64(val);
let date = format_date(val)?;
dict.insert(IntermediateKey::Str(date), intermediate_entry);
}
} else {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
@@ -583,6 +590,9 @@ pub(crate) fn cut_off_buckets<T: GetDocCount + Debug>(
#[cfg(test)]
mod tests {
use common::DateTime;
use time::{Date, Month};
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::tests::{
exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit,
@@ -1813,4 +1823,75 @@ mod tests {
Ok(())
}
#[test]
fn terms_aggregation_date() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let date_field = schema_builder.add_date_field("date_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut writer = index.writer_with_num_threads(1, 15_000_000)?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1983, Month::September, 27)?.with_hms(0, 0, 0)?)))?;
writer.commit()?;
}
let agg_req: Aggregations = serde_json::from_value(json!({
"my_date": {
"terms": {
"field": "date_field"
},
}
}))
.unwrap();
let res = exec_request_with_query(agg_req, &index, None)?;
// date_field field
assert_eq!(res["my_date"]["buckets"][0]["key"], "1982-09-17T00:00:00Z");
assert_eq!(res["my_date"]["buckets"][0]["doc_count"], 2);
assert_eq!(res["my_date"]["buckets"][1]["key"], "1983-09-27T00:00:00Z");
assert_eq!(res["my_date"]["buckets"][1]["doc_count"], 1);
assert_eq!(res["my_date"]["buckets"][2]["key"], serde_json::Value::Null);
Ok(())
}
#[test]
fn terms_aggregation_date_missing() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let date_field = schema_builder.add_date_field("date_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut writer = index.writer_with_num_threads(1, 15_000_000)?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1983, Month::September, 27)?.with_hms(0, 0, 0)?)))?;
writer.add_document(doc!())?;
writer.commit()?;
}
let agg_req: Aggregations = serde_json::from_value(json!({
"my_date": {
"terms": {
"field": "date_field",
"missing": "1982-09-17T00:00:00Z"
},
}
}))
.unwrap();
let res = exec_request_with_query(agg_req, &index, None)?;
// date_field field
assert_eq!(res["my_date"]["buckets"][0]["key"], "1982-09-17T00:00:00Z");
assert_eq!(res["my_date"]["buckets"][0]["doc_count"], 3);
assert_eq!(res["my_date"]["buckets"][1]["key"], "1983-09-27T00:00:00Z");
assert_eq!(res["my_date"]["buckets"][1]["doc_count"], 1);
assert_eq!(res["my_date"]["buckets"][2]["key"], serde_json::Value::Null);
Ok(())
}
}

View File

@@ -172,10 +172,16 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
Range(_) => IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(
Default::default(),
)),
Histogram(_) | DateHistogram(_) => {
Histogram(_) => {
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
buckets: Vec::new(),
column_type: None,
is_date_agg: false,
})
}
DateHistogram(_) => {
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
buckets: Vec::new(),
is_date_agg: true,
})
}
Average(_) => IntermediateAggregationResult::Metric(IntermediateMetricResult::Average(
@@ -343,8 +349,8 @@ pub enum IntermediateBucketResult {
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
Histogram {
/// The column_type of the underlying `Column`
column_type: Option<ColumnType>,
/// The column_type of the underlying `Column` is DateTime
is_date_agg: bool,
/// The buckets
buckets: Vec<IntermediateHistogramBucketEntry>,
},
@@ -399,7 +405,7 @@ impl IntermediateBucketResult {
Ok(BucketResult::Range { buckets })
}
IntermediateBucketResult::Histogram {
column_type,
is_date_agg,
buckets,
} => {
let histogram_req = &req
@@ -408,7 +414,7 @@ impl IntermediateBucketResult {
.expect("unexpected aggregation, expected histogram aggregation");
let buckets = intermediate_histogram_buckets_to_final_buckets(
buckets,
column_type,
is_date_agg,
histogram_req,
req.sub_aggregation(),
limits,
@@ -457,11 +463,11 @@ impl IntermediateBucketResult {
(
IntermediateBucketResult::Histogram {
buckets: buckets_left,
..
is_date_agg: _,
},
IntermediateBucketResult::Histogram {
buckets: buckets_right,
..
is_date_agg: _,
},
) => {
let buckets: Result<Vec<IntermediateHistogramBucketEntry>, TantivyError> =

View File

@@ -16,7 +16,7 @@ use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer(3_000_000).unwrap();
/// let mut index_writer = index.writer(15_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind")).unwrap();
/// index_writer.add_document(doc!(title => "The Diary of Muadib")).unwrap();
/// index_writer.add_document(doc!(title => "A Dairy Cow")).unwrap();

View File

@@ -89,7 +89,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// // a document can be associated with any number of facets
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",

View File

@@ -233,7 +233,7 @@ mod tests {
let val_field = schema_builder.add_i64_field("val_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(val_field=>12i64))?;
writer.add_document(doc!(val_field=>-30i64))?;
writer.add_document(doc!(val_field=>-12i64))?;
@@ -255,7 +255,7 @@ mod tests {
let val_field = schema_builder.add_i64_field("val_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(val_field=>12i64))?;
writer.commit()?;
writer.add_document(doc!(val_field=>-30i64))?;
@@ -280,7 +280,7 @@ mod tests {
let date_field = schema_builder.add_date_field("date_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
writer.add_document(
doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1986, Month::March, 9)?.with_hms(0, 0, 0)?)),

View File

@@ -44,7 +44,7 @@
//! # let title = schema_builder.add_text_field("title", TEXT);
//! # let schema = schema_builder.build();
//! # let index = Index::create_in_ram(schema);
//! # let mut index_writer = index.writer(3_000_000)?;
//! # let mut index_writer = index.writer(15_000_000)?;
//! # index_writer.add_document(doc!(
//! # title => "The Name of the Wind",
//! # ))?;

View File

@@ -120,7 +120,7 @@ impl<TFruit: Fruit> FruitHandle<TFruit> {
/// let title = schema_builder.add_text_field("title", TEXT);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(title => "The Name of the Wind"))?;
/// index_writer.add_document(doc!(title => "The Diary of Muadib"))?;
/// index_writer.add_document(doc!(title => "A Dairy Cow"))?;

View File

@@ -16,7 +16,7 @@ use crate::directory::error::OpenReadError;
use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_ARENA_NUM_BYTES_MIN};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::segment_updater::save_metas;
use crate::reader::{IndexReader, IndexReaderBuilder};
use crate::schema::{Field, FieldType, Schema};
@@ -523,9 +523,9 @@ impl Index {
/// - `num_threads` defines the number of indexing workers that
/// should work at the same time.
///
/// - `overall_memory_arena_in_bytes` sets the amount of memory
/// - `overall_memory_budget_in_bytes` sets the amount of memory
/// allocated for all indexing thread.
/// Each thread will receive a budget of `overall_memory_arena_in_bytes / num_threads`.
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
///
/// # Errors
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
@@ -534,7 +534,7 @@ impl Index {
pub fn writer_with_num_threads(
&self,
num_threads: usize,
overall_memory_arena_in_bytes: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter> {
let directory_lock = self
.directory
@@ -550,7 +550,7 @@ impl Index {
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_arena_in_bytes / num_threads;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
@@ -561,7 +561,7 @@ impl Index {
/// Helper to create an index writer for tests.
///
/// That index writer only simply has a single thread and a memory arena of 10 MB.
/// That index writer only simply has a single thread and a memory budget of 15 MB.
/// Using a single thread gives us a deterministic allocation of DocId.
#[cfg(test)]
pub fn writer_for_tests(&self) -> crate::Result<IndexWriter> {
@@ -579,13 +579,13 @@ impl Index {
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer(&self, memory_arena_num_bytes: usize) -> crate::Result<IndexWriter> {
pub fn writer(&self, memory_budget_in_bytes: usize) -> crate::Result<IndexWriter> {
let mut num_threads = std::cmp::min(num_cpus::get(), MAX_NUM_THREAD);
let memory_arena_num_bytes_per_thread = memory_arena_num_bytes / num_threads;
if memory_arena_num_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
num_threads = (memory_arena_num_bytes / MEMORY_ARENA_NUM_BYTES_MIN).max(1);
let memory_budget_num_bytes_per_thread = memory_budget_in_bytes / num_threads;
if memory_budget_num_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
num_threads = (memory_budget_in_bytes / MEMORY_BUDGET_NUM_BYTES_MIN).max(1);
}
self.writer_with_num_threads(num_threads, memory_arena_num_bytes)
self.writer_with_num_threads(num_threads, memory_budget_in_bytes)
}
/// Accessor to the index settings

View File

@@ -234,6 +234,22 @@ impl FastFieldReaders {
Ok(dynamic_column_handle_opt)
}
/// Returning all `dynamic_column_handle`.
pub fn dynamic_column_handles(
&self,
field_name: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(field_name)? else {
return Ok(Vec::new());
};
let dynamic_column_handles = self
.columnar
.read_columns(&resolved_field_name)?
.into_iter()
.collect();
Ok(dynamic_column_handles)
}
#[doc(hidden)]
pub async fn list_dynamic_column_handles(
&self,
@@ -338,6 +354,8 @@ impl FastFieldReaders {
#[cfg(test)]
mod tests {
use columnar::ColumnType;
use crate::schema::{JsonObjectOptions, Schema, FAST};
use crate::{Document, Index};
@@ -417,4 +435,45 @@ mod tests {
Some("_dyna\u{1}notinschema\u{1}attr\u{1}color".to_string())
);
}
#[test]
fn test_fast_field_reader_dynamic_column_handles() {
let mut schema_builder = Schema::builder();
let id = schema_builder.add_u64_field("id", FAST);
let json = schema_builder.add_json_field("json", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(id=> 1u64, json => json!({"foo": 42})))
.unwrap();
index_writer
.add_document(doc!(id=> 2u64, json => json!({"foo": true})))
.unwrap();
index_writer
.add_document(doc!(id=> 3u64, json => json!({"foo": "bar"})))
.unwrap();
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let reader = searcher.segment_reader(0u32);
let fast_fields = reader.fast_fields();
let id_columns = fast_fields.dynamic_column_handles("id").unwrap();
assert_eq!(id_columns.len(), 1);
assert_eq!(id_columns.first().unwrap().column_type(), ColumnType::U64);
let foo_columns = fast_fields.dynamic_column_handles("json.foo").unwrap();
assert_eq!(foo_columns.len(), 3);
assert!(foo_columns
.iter()
.any(|column| column.column_type() == ColumnType::I64));
assert!(foo_columns
.iter()
.any(|column| column.column_type() == ColumnType::Bool));
assert!(foo_columns
.iter()
.any(|column| column.column_type() == ColumnType::Str));
println!("*** {:?}", fast_fields.columnar().list_columns());
}
}

View File

@@ -2,6 +2,7 @@ use std::collections::HashSet;
use rand::{thread_rng, Rng};
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::*;
use crate::{doc, schema, Index, IndexSettings, IndexSortByField, Order, Searcher};
@@ -30,7 +31,7 @@ fn test_functional_store() -> crate::Result<()> {
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut index_writer = index.writer_with_num_threads(3, MEMORY_BUDGET_NUM_BYTES_MIN)?;
let mut doc_set: Vec<u64> = Vec::new();

View File

@@ -27,9 +27,9 @@ use crate::{FutureResult, Opstamp};
// in the `memory_arena` goes below MARGIN_IN_BYTES.
pub const MARGIN_IN_BYTES: usize = 1_000_000;
// We impose the memory per thread to be at least 3 MB.
pub const MEMORY_ARENA_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize;
pub const MEMORY_ARENA_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
// We impose the memory per thread to be at least 15 MB, as the baseline consumption is 12MB.
pub const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize;
pub const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
// We impose the number of index writer threads to be at most this.
pub const MAX_NUM_THREAD: usize = 8;
@@ -57,7 +57,8 @@ pub struct IndexWriter {
index: Index,
memory_arena_in_bytes_per_thread: usize,
// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -264,19 +265,19 @@ impl IndexWriter {
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_arena_in_bytes_per_thread: usize,
memory_budget_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> crate::Result<IndexWriter> {
if memory_arena_in_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_ARENA_NUM_BYTES_MIN}."
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_arena_in_bytes_per_thread >= MEMORY_ARENA_NUM_BYTES_MAX {
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_ARENA_NUM_BYTES_MAX}"
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
@@ -295,7 +296,7 @@ impl IndexWriter {
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
memory_arena_in_bytes_per_thread,
memory_budget_in_bytes_per_thread,
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
@@ -396,7 +397,7 @@ impl IndexWriter {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.memory_arena_in_bytes_per_thread;
let mem_budget = self.memory_budget_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -554,7 +555,7 @@ impl IndexWriter {
let new_index_writer: IndexWriter = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_arena_in_bytes_per_thread,
self.memory_budget_in_bytes_per_thread,
directory_lock,
)?;
@@ -619,7 +620,7 @@ impl IndexWriter {
for worker_handle in former_workers_join_handle {
let indexing_worker_result = worker_handle
.join()
.map_err(|e| TantivyError::ErrorInThread(format!("{e:?}")))?;
.map_err(|e| TantivyError::ErrorInThread(e.to_string()))?;
indexing_worker_result?;
self.add_indexing_worker()?;
}
@@ -810,6 +811,7 @@ mod tests {
use crate::collector::TopDocs;
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
@@ -941,7 +943,7 @@ mod tests {
fn test_empty_operations_group() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let index_writer = index.writer_for_tests().unwrap();
let operations1 = vec![];
let batch_opstamp1 = index_writer.run(operations1).unwrap();
assert_eq!(batch_opstamp1, 0u64);
@@ -954,8 +956,8 @@ mod tests {
fn test_lockfile_stops_duplicates() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let _index_writer = index.writer(3_000_000).unwrap();
match index.writer(3_000_000) {
let _index_writer = index.writer_for_tests().unwrap();
match index.writer_for_tests() {
Err(TantivyError::LockFailure(LockError::LockBusy, _)) => {}
_ => panic!("Expected a `LockFailure` error"),
}
@@ -979,7 +981,7 @@ mod tests {
fn test_set_merge_policy() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let index_writer = index.writer_for_tests().unwrap();
assert_eq!(
format!("{:?}", index_writer.get_merge_policy()),
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, \
@@ -998,11 +1000,11 @@ mod tests {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
{
let _index_writer = index.writer(3_000_000).unwrap();
let _index_writer = index.writer_for_tests().unwrap();
// the lock should be released when the
// index_writer leaves the scope.
}
let _index_writer_two = index.writer(3_000_000).unwrap();
let _index_writer_two = index.writer_for_tests().unwrap();
}
#[test]
@@ -1022,7 +1024,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer(3_000_000)?;
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.rollback()?;
assert_eq!(index_writer.commit_opstamp(), 0u64);
@@ -1054,7 +1056,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.commit()?;
// this should create 1 segment
@@ -1094,7 +1096,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.commit()?;
index_writer.add_document(doc!(text_field=>"a"))?;
@@ -1140,7 +1142,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer(MEMORY_BUDGET_NUM_BYTES_MIN).unwrap();
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field=>"a"))?;
@@ -1196,7 +1198,8 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000)?;
let mut index_writer =
index.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)?;
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field => "a"))?;
@@ -1245,7 +1248,9 @@ mod tests {
let term = Term::from_field_text(text_field, s);
searcher.doc_freq(&term).unwrap()
};
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();
let commit_tstamp = index_writer.commit().unwrap();
@@ -1262,7 +1267,9 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();
@@ -1311,7 +1318,9 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let res = index_writer.delete_all_documents();
assert!(res.is_ok());
@@ -1338,7 +1347,9 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
// add one simple doc
assert!(index_writer.add_document(doc!(text_field => "a")).is_ok());
@@ -1371,7 +1382,9 @@ mod tests {
fn test_delete_all_documents_empty_index() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let clear = index_writer.delete_all_documents();
let commit = index_writer.commit();
assert!(clear.is_ok());
@@ -1382,7 +1395,9 @@ mod tests {
fn test_delete_all_documents_index_twice() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let clear = index_writer.delete_all_documents();
let commit = index_writer.commit();
assert!(clear.is_ok());

View File

@@ -596,10 +596,15 @@ impl SegmentUpdater {
);
{
if let Some(after_merge_segment_entry) = after_merge_segment_entry.as_mut() {
// Deletes and commits could have happened as we were merging.
// We need to make sure we are up to date with deletes before accepting the
// segment.
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_meta().opstamp;
if delete_operation.opstamp < committed_opstamp {
// We are not up to date! Let's create a new tombstone file for our
// freshly create split.
let index = &segment_updater.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
if let Err(advance_deletes_err) = advance_deletes(

View File

@@ -26,6 +26,8 @@ use crate::{DocId, Document, Opstamp, SegmentComponent, TantivyError};
fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
let table_memory_upper_bound = per_thread_memory_budget / 3;
(10..20) // We cap it at 2^19 = 512K capacity.
// TODO: There are cases where this limit causes a
// reallocation in the hashmap. Check if this affects performance.
.map(|power| 1 << power)
.take_while(|capacity| compute_table_memory_size(*capacity) < table_memory_upper_bound)
.last()

View File

@@ -225,7 +225,7 @@ pub mod tests {
{
let mut segment_writer =
SegmentWriter::for_segment(3_000_000, segment.clone()).unwrap();
SegmentWriter::for_segment(15_000_000, segment.clone()).unwrap();
{
// checking that position works if the field has two values
let op = AddOperation {

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use common::BitSet;
use tantivy_fst::Automaton;
use super::phrase_prefix_query::prefix_end;
use crate::core::SegmentReader;
use crate::query::{BitSetDocSet, ConstScorer, Explanation, Scorer, Weight};
use crate::schema::{Field, IndexRecordOption};
@@ -14,6 +15,10 @@ use crate::{DocId, Score, TantivyError};
pub struct AutomatonWeight<A> {
field: Field,
automaton: Arc<A>,
// For JSON fields, the term dictionary include terms from all paths.
// We apply additional filtering based on the given JSON path, when searching within the term
// dictionary. This prevents terms from unrelated paths from matching the search criteria.
json_path_bytes: Option<Box<[u8]>>,
}
impl<A> AutomatonWeight<A>
@@ -26,6 +31,20 @@ where
AutomatonWeight {
field,
automaton: automaton.into(),
json_path_bytes: None,
}
}
/// Create a new AutomationWeight for a json path
pub fn new_for_json_path<IntoArcA: Into<Arc<A>>>(
field: Field,
automaton: IntoArcA,
json_path_bytes: &[u8],
) -> AutomatonWeight<A> {
AutomatonWeight {
field,
automaton: automaton.into(),
json_path_bytes: Some(json_path_bytes.to_vec().into_boxed_slice()),
}
}
@@ -34,7 +53,15 @@ where
term_dict: &'a TermDictionary,
) -> io::Result<TermStreamer<'a, &'a A>> {
let automaton: &A = &self.automaton;
let term_stream_builder = term_dict.search(automaton);
let mut term_stream_builder = term_dict.search(automaton);
if let Some(json_path_bytes) = &self.json_path_bytes {
term_stream_builder = term_stream_builder.ge(json_path_bytes);
if let Some(end) = prefix_end(json_path_bytes) {
term_stream_builder = term_stream_builder.lt(&end);
}
}
term_stream_builder.into_stream()
}
}

View File

@@ -32,7 +32,7 @@ use crate::schema::{IndexRecordOption, Term};
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;

View File

@@ -297,7 +297,7 @@ mod tests {
let text = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 5_000_000)?;
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text=>"a"))?;
index_writer.add_document(doc!(text=>"b"))?;
index_writer.commit()?;

View File

@@ -23,7 +23,7 @@ use crate::{Score, Term};
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of Girl",
/// ))?;

345
src/query/exist_query.rs Normal file
View File

@@ -0,0 +1,345 @@
use core::fmt::Debug;
use columnar::{ColumnIndex, DynamicColumn};
use super::{ConstScorer, EmptyScorer};
use crate::core::SegmentReader;
use crate::docset::{DocSet, TERMINATED};
use crate::query::explanation::does_not_match;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, Score, TantivyError};
/// Query that matches all documents with a non-null value in the specified field.
///
/// All of the matched documents get the score 1.0.
#[derive(Clone, Debug)]
pub struct ExistsQuery {
field_name: String,
}
impl ExistsQuery {
/// Creates a new `ExistQuery` from the given field.
///
/// This query matches all documents with at least one non-null value in the specified field.
/// This constructor never fails, but executing the search with this query will return an
/// error if the specified field doesn't exists or is not a fast field.
pub fn new_exists_query(field: String) -> ExistsQuery {
ExistsQuery { field_name: field }
}
}
impl Query for ExistsQuery {
fn weight(&self, enable_scoring: EnableScoring) -> crate::Result<Box<dyn Weight>> {
let schema = enable_scoring.schema();
let Some((field, _path)) = schema.find_field(&self.field_name) else {
return Err(TantivyError::FieldNotFound(self.field_name.clone()));
};
let field_type = schema.get_field_entry(field).field_type();
if !field_type.is_fast() {
return Err(TantivyError::SchemaError(format!(
"Field {} is not a fast field.",
self.field_name
)));
}
Ok(Box::new(ExistsWeight {
field_name: self.field_name.clone(),
}))
}
}
/// Weight associated with the `ExistsQuery` query.
pub struct ExistsWeight {
field_name: String,
}
impl Weight for ExistsWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let fast_field_reader = reader.fast_fields();
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
.dynamic_column_handles(&self.field_name)?
.into_iter()
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
.collect();
let mut non_empty_columns = Vec::new();
for column in dynamic_columns? {
if !matches!(column.column_index(), ColumnIndex::Empty { .. }) {
non_empty_columns.push(column)
}
}
// TODO: we can optimizer more here since in most cases we will have only one index
if !non_empty_columns.is_empty() {
let docset = ExistsDocSet::new(non_empty_columns, reader.max_doc());
Ok(Box::new(ConstScorer::new(docset, boost)))
} else {
Ok(Box::new(EmptyScorer))
}
}
fn explain(&self, reader: &SegmentReader, doc: DocId) -> crate::Result<Explanation> {
let mut scorer = self.scorer(reader, 1.0)?;
if scorer.seek(doc) != doc {
return Err(does_not_match(doc));
}
Ok(Explanation::new("ExistsQuery", 1.0))
}
}
pub(crate) struct ExistsDocSet {
columns: Vec<DynamicColumn>,
doc: DocId,
max_doc: DocId,
}
impl ExistsDocSet {
pub(crate) fn new(columns: Vec<DynamicColumn>, max_doc: DocId) -> Self {
let mut set = Self {
columns,
doc: 0u32,
max_doc,
};
set.find_next();
set
}
fn find_next(&mut self) -> DocId {
while self.doc < self.max_doc {
if self
.columns
.iter()
.any(|col| col.column_index().has_value(self.doc))
{
return self.doc;
}
self.doc += 1;
}
self.doc = TERMINATED;
TERMINATED
}
}
impl DocSet for ExistsDocSet {
fn advance(&mut self) -> DocId {
self.seek(self.doc + 1)
}
fn size_hint(&self) -> u32 {
0
}
fn doc(&self) -> DocId {
self.doc
}
#[inline(always)]
fn seek(&mut self, target: DocId) -> DocId {
self.doc = target;
self.find_next()
}
}
#[cfg(test)]
mod tests {
use std::net::Ipv6Addr;
use std::ops::Bound;
use common::DateTime;
use time::OffsetDateTime;
use crate::collector::Count;
use crate::query::exist_query::ExistsQuery;
use crate::query::{BooleanQuery, RangeQuery};
use crate::schema::{Facet, FacetOptions, Schema, FAST, INDEXED, STRING, TEXT};
use crate::{doc, Index, Searcher};
#[test]
fn test_exists_query_simple() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let all_field = schema_builder.add_u64_field("all", INDEXED | FAST);
let even_field = schema_builder.add_u64_field("even", INDEXED | FAST);
let odd_field = schema_builder.add_text_field("odd", STRING | FAST);
let multi_field = schema_builder.add_text_field("multi", FAST);
let _never_field = schema_builder.add_u64_field("never", INDEXED | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_for_tests()?;
for i in 0u64..100u64 {
if i % 2 == 0 {
if i % 10 == 0 {
index_writer.add_document(doc!(all_field => i, even_field => i, multi_field => i.to_string(), multi_field => (i + 1).to_string()))?;
} else {
index_writer.add_document(doc!(all_field => i, even_field => i))?;
}
} else {
index_writer.add_document(doc!(all_field => i, odd_field => i.to_string()))?;
}
}
index_writer.commit()?;
}
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
// exercise seek
let query = BooleanQuery::intersection(vec![
Box::new(RangeQuery::new_u64_bounds(
"all".to_string(),
Bound::Included(50),
Bound::Unbounded,
)),
Box::new(ExistsQuery::new_exists_query("even".to_string())),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
let query = BooleanQuery::intersection(vec![
Box::new(RangeQuery::new_u64_bounds(
"all".to_string(),
Bound::Included(0),
Bound::Excluded(50),
)),
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
Ok(())
}
#[test]
fn test_exists_query_json() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let json = schema_builder.add_json_field("json", TEXT | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_for_tests()?;
for i in 0u64..100u64 {
if i % 2 == 0 {
index_writer.add_document(doc!(json => json!({"all": i, "even": true})))?;
} else {
index_writer
.add_document(doc!(json => json!({"all": i.to_string(), "odd": true})))?;
}
}
index_writer.commit()?;
}
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
// Handling of non-existing fields:
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists.absent'"
);
Ok(())
}
#[test]
fn test_exists_query_misc_supported_types() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let bool = schema_builder.add_bool_field("bool", FAST);
let bytes = schema_builder.add_bytes_field("bytes", FAST);
let date = schema_builder.add_date_field("date", FAST);
let f64 = schema_builder.add_f64_field("f64", FAST);
let ip_addr = schema_builder.add_ip_addr_field("ip_addr", FAST);
let facet = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_for_tests()?;
let now = OffsetDateTime::now_utc().unix_timestamp();
for i in 0u8..100u8 {
if i % 2 == 0 {
let date_val = DateTime::from_utc(OffsetDateTime::from_unix_timestamp(
now + i as i64 * 100,
)?);
index_writer.add_document(
doc!(bool => i % 3 == 0, bytes => vec![i, i + 1, i + 2], date => date_val),
)?;
} else {
let ip_addr_v6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc00a, i.into());
index_writer
.add_document(doc!(f64 => i as f64 * 0.5, ip_addr => ip_addr_v6, facet => Facet::from("/facet/foo"), facet => Facet::from("/facet/bar")))?;
}
}
index_writer.commit()?;
}
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
Ok(())
}
#[test]
fn test_exists_query_unsupported_types() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let not_fast = schema_builder.add_text_field("not_fast", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(
not_fast => "slow",
))?;
index_writer.commit()?;
}
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("not_fast".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"Schema error: 'Field not_fast is not a fast field.'"
);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists'"
);
Ok(())
}
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
let query = ExistsQuery::new_exists_query(field.to_string());
searcher.search(&query, &Count)
}
}

View File

@@ -3,7 +3,7 @@ use once_cell::sync::OnceCell;
use tantivy_fst::Automaton;
use crate::query::{AutomatonWeight, EnableScoring, Query, Weight};
use crate::schema::Term;
use crate::schema::{Term, Type};
use crate::TantivyError::InvalidArgument;
pub(crate) struct DfaWrapper(pub DFA);
@@ -46,7 +46,7 @@ impl Automaton for DfaWrapper {
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;
@@ -132,18 +132,46 @@ impl FuzzyTermQuery {
});
let term_value = self.term.value();
let term_text = term_value.as_str().ok_or_else(|| {
InvalidArgument("The fuzzy term query requires a string term.".to_string())
})?;
let term_text = if term_value.typ() == Type::Json {
if let Some(json_path_type) = term_value.json_path_type() {
if json_path_type != Type::Str {
return Err(InvalidArgument(format!(
"The fuzzy term query requires a string path type for a json term. Found \
{:?}",
json_path_type
)));
}
}
std::str::from_utf8(self.term.serialized_value_bytes()).map_err(|_| {
InvalidArgument(
"Failed to convert json term value bytes to utf8 string.".to_string(),
)
})?
} else {
term_value.as_str().ok_or_else(|| {
InvalidArgument("The fuzzy term query requires a string term.".to_string())
})?
};
let automaton = if self.prefix {
automaton_builder.build_prefix_dfa(term_text)
} else {
automaton_builder.build_dfa(term_text)
};
Ok(AutomatonWeight::new(
self.term.field(),
DfaWrapper(automaton),
))
if let Some((json_path_bytes, _)) = term_value.as_json() {
Ok(AutomatonWeight::new_for_json_path(
self.term.field(),
DfaWrapper(automaton),
json_path_bytes,
))
} else {
Ok(AutomatonWeight::new(
self.term.field(),
DfaWrapper(automaton),
))
}
}
}
@@ -157,9 +185,89 @@ impl Query for FuzzyTermQuery {
mod test {
use super::FuzzyTermQuery;
use crate::collector::{Count, TopDocs};
use crate::schema::{Schema, TEXT};
use crate::indexer::NoMergePolicy;
use crate::query::QueryParser;
use crate::schema::{Schema, STORED, TEXT};
use crate::{assert_nearly_equals, Index, Term};
#[test]
pub fn test_fuzzy_json_path() -> crate::Result<()> {
// # Defining the schema
let mut schema_builder = Schema::builder();
let attributes = schema_builder.add_json_field("attributes", TEXT | STORED);
let schema = schema_builder.build();
// # Indexing documents
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let doc = schema.parse_document(
r#"{
"attributes": {
"a": "japan"
}
}"#,
)?;
index_writer.add_document(doc)?;
let doc = schema.parse_document(
r#"{
"attributes": {
"aa": "japan"
}
}"#,
)?;
index_writer.add_document(doc)?;
index_writer.commit()?;
let reader = index.reader()?;
let searcher = reader.searcher();
// # Fuzzy search
let query_parser = QueryParser::for_index(&index, vec![attributes]);
let get_json_path_term = |query: &str| -> crate::Result<Term> {
let query = query_parser.parse_query(query)?;
let mut terms = Vec::new();
query.query_terms(&mut |term, _| {
terms.push(term.clone());
});
Ok(terms[0].clone())
};
// shall not match the first document due to json path mismatch
{
let term = get_json_path_term("attributes.aa:japan")?;
let fuzzy_query = FuzzyTermQuery::new(term, 2, true);
let top_docs = searcher.search(&fuzzy_query, &TopDocs::with_limit(2))?;
assert_eq!(top_docs.len(), 1, "Expected only 1 document");
assert_eq!(top_docs[0].1.doc_id, 1, "Expected the second document");
}
// shall match the first document because Levenshtein distance is 1 (substitute 'o' with
// 'a')
{
let term = get_json_path_term("attributes.a:japon")?;
let fuzzy_query = FuzzyTermQuery::new(term, 1, true);
let top_docs = searcher.search(&fuzzy_query, &TopDocs::with_limit(2))?;
assert_eq!(top_docs.len(), 1, "Expected only 1 document");
assert_eq!(top_docs[0].1.doc_id, 0, "Expected the first document");
}
// shall not match because non-prefix Levenshtein distance is more than 1 (add 'a' and 'n')
{
let term = get_json_path_term("attributes.a:jap")?;
let fuzzy_query = FuzzyTermQuery::new(term, 1, true);
let top_docs = searcher.search(&fuzzy_query, &TopDocs::with_limit(2))?;
assert_eq!(top_docs.len(), 0, "Expected no document");
}
Ok(())
}
#[test]
pub fn test_fuzzy_term() -> crate::Result<()> {
let mut schema_builder = Schema::builder();

View File

@@ -8,6 +8,7 @@ mod const_score_query;
mod disjunction_max_query;
mod empty_query;
mod exclude;
mod exist_query;
mod explanation;
mod fuzzy_query;
mod intersection;
@@ -41,6 +42,7 @@ pub use self::const_score_query::{ConstScoreQuery, ConstScorer};
pub use self::disjunction_max_query::DisjunctionMaxQuery;
pub use self::empty_query::{EmptyQuery, EmptyScorer, EmptyWeight};
pub use self::exclude::Exclude;
pub use self::exist_query::ExistsQuery;
pub use self::explanation::Explanation;
#[cfg(test)]
pub(crate) use self::fuzzy_query::DfaWrapper;

View File

@@ -6,7 +6,7 @@ pub use phrase_prefix_query::PhrasePrefixQuery;
pub use phrase_prefix_scorer::PhrasePrefixScorer;
pub use phrase_prefix_weight::PhrasePrefixWeight;
fn prefix_end(prefix_start: &[u8]) -> Option<Vec<u8>> {
pub(crate) fn prefix_end(prefix_start: &[u8]) -> Option<Vec<u8>> {
let mut res = prefix_start.to_owned();
while !res.is_empty() {
let end = res.len() - 1;

View File

@@ -847,6 +847,12 @@ impl QueryParser {
}));
(Some(logical_ast), errors)
}
UserInputLeaf::Exists { .. } => (
None,
vec![QueryParserError::UnsupportedQuery(
"Range query need to target a specific field.".to_string(),
)],
),
}
}
}

View File

@@ -26,7 +26,7 @@ use crate::schema::Field;
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;

View File

@@ -27,7 +27,7 @@ use crate::Term;
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;
@@ -151,7 +151,7 @@ mod tests {
let ip_addr_2 = Ipv6Addr::from_u128(10);
{
let mut index_writer = index.writer(3_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(
ip_field => ip_addr_1

View File

@@ -179,6 +179,7 @@ mod tests {
use super::Warmer;
use crate::core::searcher::SearcherGeneration;
use crate::directory::RamDirectory;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::{Schema, INDEXED};
use crate::{Index, IndexSettings, ReloadPolicy, Searcher, SegmentId};
@@ -255,7 +256,10 @@ mod tests {
let num_writer_threads = 4;
let mut writer = index
.writer_with_num_threads(num_writer_threads, 25_000_000)
.writer_with_num_threads(
num_writer_threads,
MEMORY_BUDGET_NUM_BYTES_MIN * num_writer_threads,
)
.unwrap();
for i in 0u64..1000u64 {

View File

@@ -397,20 +397,29 @@ where B: AsRef<[u8]>
Some(Ipv6Addr::from_u128(ip_u128))
}
/// Returns the json path (without non-human friendly separators),
/// Returns the json path type.
///
/// Returns `None` if the value is not JSON.
pub fn json_path_type(&self) -> Option<Type> {
let json_value_bytes = self.as_json_value_bytes()?;
Some(json_value_bytes.typ())
}
/// Returns the json path bytes (including the JSON_END_OF_PATH byte),
/// and the encoded ValueBytes after the json path.
///
/// Returns `None` if the value is not JSON.
pub(crate) fn as_json(&self) -> Option<(&str, ValueBytes<&[u8]>)> {
pub(crate) fn as_json(&self) -> Option<(&[u8], ValueBytes<&[u8]>)> {
if self.typ() != Type::Json {
return None;
}
let bytes = self.value_bytes();
let pos = bytes.iter().cloned().position(|b| b == JSON_END_OF_PATH)?;
let (json_path_bytes, term) = bytes.split_at(pos);
let json_path = str::from_utf8(json_path_bytes).ok()?;
Some((json_path, ValueBytes::wrap(&term[1..])))
// split at pos + 1, so that json_path_bytes includes the JSON_END_OF_PATH byte.
let (json_path_bytes, term) = bytes.split_at(pos + 1);
Some((json_path_bytes, ValueBytes::wrap(&term)))
}
/// Returns the encoded ValueBytes after the json path.
@@ -469,7 +478,10 @@ where B: AsRef<[u8]>
write_opt(f, self.as_bytes())?;
}
Type::Json => {
if let Some((path, sub_value_bytes)) = self.as_json() {
if let Some((path_bytes, sub_value_bytes)) = self.as_json() {
// Remove the JSON_END_OF_PATH byte & convert to utf8.
let path = str::from_utf8(&path_bytes[..path_bytes.len() - 1])
.map_err(|_| std::fmt::Error)?;
let path_pretty = path.replace(JSON_PATH_SEGMENT_SEP_STR, ".");
write!(f, "path={path_pretty}, ")?;
sub_value_bytes.debug_value_bytes(f)?;

View File

@@ -45,7 +45,7 @@ fn test_write_commit_fails() -> tantivy::Result<()> {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?;
let mut index_writer = index.writer_with_num_threads(1, 15_000_000)?;
for _ in 0..100 {
index_writer.add_document(doc!(text_field => "a"))?;
}
@@ -75,7 +75,7 @@ fn test_fail_on_flush_segment() -> tantivy::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer_with_num_threads(1, 3_000_000)?;
let index_writer = index.writer_with_num_threads(1, 15_000_000)?;
fail::cfg("FieldSerializer::close_term", "return(simulatederror)").unwrap();
for i in 0..100_000 {
if index_writer
@@ -94,7 +94,7 @@ fn test_fail_on_flush_segment_but_one_worker_remains() -> tantivy::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer_with_num_threads(2, 6_000_000)?;
let index_writer = index.writer_with_num_threads(2, 30_000_000)?;
fail::cfg("FieldSerializer::close_term", "1*return(simulatederror)").unwrap();
for i in 0..100_000 {
if index_writer
@@ -113,7 +113,7 @@ fn test_fail_on_commit_segment() -> tantivy::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?;
let mut index_writer = index.writer_with_num_threads(1, 15_000_000)?;
fail::cfg("FieldSerializer::close_term", "return(simulatederror)").unwrap();
for i in 0..10 {
index_writer