mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-05 01:50:42 +00:00
Fix DateHistogram bucket gap (#2183)
* Fix DateHistogram bucket gap Fixes a computation issue of the number of buckets needed in the DateHistogram. This is due to a missing normalization from request values (ms) to fast field values (ns), when converting an intermediate result to the final result. This results in a wrong computation by a factor 1_000_000. The Histogram normalizes values to nanoseconds, to make the user input like extended_bounds (ms precision) and the values from the fast field (ns precision for date type) compatible. This normalization happens only for date type fields, as other field types don't have precision settings. The normalization does not happen due a missing `column_type`, which is not correctly passed after merging an empty aggregation (which does not have a `column_type` set), with a regular aggregation. Another related issue is an empty aggregation, which will not have `column_type` set, will not convert the result to human readable format. This PR fixes the issue by: - Limit the allowed field types of DateHistogram to DateType - Instead of passing the column_type, which is only available on the segment level, we flag the aggregation as `is_date_agg`. - Fix the merge logic Add a flag to to normalization only once. This is not an issue currently, but it could become easily one. closes https://github.com/quickwit-oss/quickwit/issues/3837 * use older nightly for time crate (breaks build)
This commit is contained in:
4
.github/workflows/coverage.yml
vendored
4
.github/workflows/coverage.yml
vendored
@@ -17,11 +17,11 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup toolchain install nightly --profile minimal --component llvm-tools-preview
|
||||
run: rustup toolchain install nightly-2023-09-10 --profile minimal --component llvm-tools-preview
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- uses: taiki-e/install-action@cargo-llvm-cov
|
||||
- name: Generate code coverage
|
||||
run: cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
run: cargo +nightly-2023-09-10 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v3
|
||||
continue-on-error: true
|
||||
|
||||
@@ -134,3 +134,142 @@ impl Drop for ResourceLimitGuard {
|
||||
.fetch_sub(self.allocated_with_the_guard, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::aggregation::tests::exec_request_with_query;
|
||||
|
||||
// https://github.com/quickwit-oss/quickwit/issues/3837
|
||||
#[test]
|
||||
fn test_agg_limits_with_empty_merge() {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::bucket::tests::get_test_index_from_docs;
|
||||
|
||||
let docs = vec![
|
||||
vec![r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb", "text2": "bbb" }"#],
|
||||
vec![r#"{ "text": "aaa", "text2": "bbb" }"#],
|
||||
];
|
||||
let index = get_test_index_from_docs(false, &docs).unwrap();
|
||||
|
||||
{
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"1": {
|
||||
"terms": {"field": "text2", "min_doc_count": 0},
|
||||
"aggs": {
|
||||
"2":{
|
||||
"date_histogram": {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"extended_bounds": {
|
||||
"min": "2015-01-01T00:00:00Z",
|
||||
"max": "2015-01-10T00:00:00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
|
||||
let expected_res = json!({
|
||||
"1": {
|
||||
"buckets": [
|
||||
{
|
||||
"2": {
|
||||
"buckets": [
|
||||
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
|
||||
{ "doc_count": 1, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
|
||||
]
|
||||
},
|
||||
"doc_count": 1,
|
||||
"key": "bbb"
|
||||
}
|
||||
],
|
||||
"doc_count_error_upper_bound": 0,
|
||||
"sum_other_doc_count": 0
|
||||
}
|
||||
});
|
||||
assert_eq!(res, expected_res);
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/quickwit-oss/quickwit/issues/3837
|
||||
#[test]
|
||||
fn test_agg_limits_with_empty_data() {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::bucket::tests::get_test_index_from_docs;
|
||||
|
||||
let docs = vec![vec![r#"{ "text": "aaa", "text2": "bbb" }"#]];
|
||||
let index = get_test_index_from_docs(false, &docs).unwrap();
|
||||
|
||||
{
|
||||
// Empty result since there is no doc with dates
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"1": {
|
||||
"terms": {"field": "text2", "min_doc_count": 0},
|
||||
"aggs": {
|
||||
"2":{
|
||||
"date_histogram": {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"extended_bounds": {
|
||||
"min": "2015-01-01T00:00:00Z",
|
||||
"max": "2015-01-10T00:00:00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
|
||||
let expected_res = json!({
|
||||
"1": {
|
||||
"buckets": [
|
||||
{
|
||||
"2": {
|
||||
"buckets": [
|
||||
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
|
||||
]
|
||||
},
|
||||
"doc_count": 0,
|
||||
"key": "bbb"
|
||||
}
|
||||
],
|
||||
"doc_count_error_upper_bound": 0,
|
||||
"sum_other_doc_count": 0
|
||||
}
|
||||
});
|
||||
assert_eq!(res, expected_res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +103,8 @@ impl AggregationWithAccessor {
|
||||
field: field_name, ..
|
||||
}) => {
|
||||
let (accessor, column_type) =
|
||||
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
||||
// Only DateTime is supported for DateHistogram
|
||||
get_ff_reader(reader, field_name, Some(&[ColumnType::DateTime]))?;
|
||||
add_agg_with_accessor(accessor, column_type, &mut res)?;
|
||||
}
|
||||
Terms(TermsAggregation {
|
||||
|
||||
@@ -132,6 +132,7 @@ impl DateHistogramAggregationReq {
|
||||
hard_bounds: self.hard_bounds,
|
||||
extended_bounds: self.extended_bounds,
|
||||
keyed: self.keyed,
|
||||
is_normalized_to_ns: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -243,14 +244,14 @@ fn parse_into_milliseconds(input: &str) -> Result<i64, AggregationError> {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
pub mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::exec_request;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::schema::{Schema, FAST};
|
||||
use crate::schema::{Schema, FAST, STRING};
|
||||
use crate::Index;
|
||||
|
||||
#[test]
|
||||
@@ -306,7 +307,8 @@ mod tests {
|
||||
) -> crate::Result<Index> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
schema_builder.add_date_field("date", FAST);
|
||||
schema_builder.add_text_field("text", FAST);
|
||||
schema_builder.add_text_field("text", FAST | STRING);
|
||||
schema_builder.add_text_field("text2", FAST | STRING);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
{
|
||||
|
||||
@@ -122,11 +122,14 @@ pub struct HistogramAggregation {
|
||||
/// Whether to return the buckets as a hash map
|
||||
#[serde(default)]
|
||||
pub keyed: bool,
|
||||
/// Whether the values are normalized to ns for date time values. Defaults to false.
|
||||
#[serde(default)]
|
||||
pub is_normalized_to_ns: bool,
|
||||
}
|
||||
|
||||
impl HistogramAggregation {
|
||||
pub(crate) fn normalize(&mut self, column_type: ColumnType) {
|
||||
if column_type.is_date_time() {
|
||||
pub(crate) fn normalize_date_time(&mut self) {
|
||||
if !self.is_normalized_to_ns {
|
||||
// values are provided in ms, but the fastfield is in nano seconds
|
||||
self.interval *= 1_000_000.0;
|
||||
self.offset = self.offset.map(|off| off * 1_000_000.0);
|
||||
@@ -138,6 +141,7 @@ impl HistogramAggregation {
|
||||
min: bounds.min * 1_000_000.0,
|
||||
max: bounds.max * 1_000_000.0,
|
||||
});
|
||||
self.is_normalized_to_ns = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -370,7 +374,7 @@ impl SegmentHistogramCollector {
|
||||
|
||||
Ok(IntermediateBucketResult::Histogram {
|
||||
buckets,
|
||||
column_type: Some(self.column_type),
|
||||
is_date_agg: self.column_type == ColumnType::DateTime,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -381,7 +385,9 @@ impl SegmentHistogramCollector {
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
req.validate()?;
|
||||
req.normalize(field_type);
|
||||
if field_type == ColumnType::DateTime {
|
||||
req.normalize_date_time();
|
||||
}
|
||||
|
||||
let sub_aggregation_blueprint = if sub_aggregation.is_empty() {
|
||||
None
|
||||
@@ -439,6 +445,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
// memory check upfront
|
||||
let (_, first_bucket_num, last_bucket_num) =
|
||||
generate_bucket_pos_with_opt_minmax(histogram_req, min_max);
|
||||
|
||||
// It's based on user input, so we need to account for overflows
|
||||
let added_buckets = ((last_bucket_num.saturating_sub(first_bucket_num)).max(0) as u64)
|
||||
.saturating_sub(buckets.len() as u64);
|
||||
@@ -482,7 +489,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
// Convert to BucketEntry
|
||||
pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
column_type: Option<ColumnType>,
|
||||
is_date_agg: bool,
|
||||
histogram_req: &HistogramAggregation,
|
||||
sub_aggregation: &Aggregations,
|
||||
limits: &AggregationLimits,
|
||||
@@ -491,8 +498,8 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
// The request used in the the call to final is not yet be normalized.
|
||||
// Normalization is changing the precision from milliseconds to nanoseconds.
|
||||
let mut histogram_req = histogram_req.clone();
|
||||
if let Some(column_type) = column_type {
|
||||
histogram_req.normalize(column_type);
|
||||
if is_date_agg {
|
||||
histogram_req.normalize_date_time();
|
||||
}
|
||||
let mut buckets = if histogram_req.min_doc_count() == 0 {
|
||||
// With min_doc_count != 0, we may need to add buckets, so that there are no
|
||||
@@ -516,7 +523,7 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
|
||||
// If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339
|
||||
// and normalize from nanoseconds to milliseconds
|
||||
if column_type == Some(ColumnType::DateTime) {
|
||||
if is_date_agg {
|
||||
for bucket in buckets.iter_mut() {
|
||||
if let crate::aggregation::Key::F64(ref mut val) = bucket.key {
|
||||
let key_as_string = format_date(*val as i64)?;
|
||||
|
||||
@@ -172,10 +172,16 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
|
||||
Range(_) => IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(
|
||||
Default::default(),
|
||||
)),
|
||||
Histogram(_) | DateHistogram(_) => {
|
||||
Histogram(_) => {
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
|
||||
buckets: Vec::new(),
|
||||
column_type: None,
|
||||
is_date_agg: false,
|
||||
})
|
||||
}
|
||||
DateHistogram(_) => {
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
|
||||
buckets: Vec::new(),
|
||||
is_date_agg: true,
|
||||
})
|
||||
}
|
||||
Average(_) => IntermediateAggregationResult::Metric(IntermediateMetricResult::Average(
|
||||
@@ -343,8 +349,8 @@ pub enum IntermediateBucketResult {
|
||||
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
Histogram {
|
||||
/// The column_type of the underlying `Column`
|
||||
column_type: Option<ColumnType>,
|
||||
/// The column_type of the underlying `Column` is DateTime
|
||||
is_date_agg: bool,
|
||||
/// The buckets
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
},
|
||||
@@ -399,7 +405,7 @@ impl IntermediateBucketResult {
|
||||
Ok(BucketResult::Range { buckets })
|
||||
}
|
||||
IntermediateBucketResult::Histogram {
|
||||
column_type,
|
||||
is_date_agg,
|
||||
buckets,
|
||||
} => {
|
||||
let histogram_req = &req
|
||||
@@ -408,7 +414,7 @@ impl IntermediateBucketResult {
|
||||
.expect("unexpected aggregation, expected histogram aggregation");
|
||||
let buckets = intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets,
|
||||
column_type,
|
||||
is_date_agg,
|
||||
histogram_req,
|
||||
req.sub_aggregation(),
|
||||
limits,
|
||||
@@ -457,11 +463,11 @@ impl IntermediateBucketResult {
|
||||
(
|
||||
IntermediateBucketResult::Histogram {
|
||||
buckets: buckets_left,
|
||||
..
|
||||
is_date_agg: _,
|
||||
},
|
||||
IntermediateBucketResult::Histogram {
|
||||
buckets: buckets_right,
|
||||
..
|
||||
is_date_agg: _,
|
||||
},
|
||||
) => {
|
||||
let buckets: Result<Vec<IntermediateHistogramBucketEntry>, TantivyError> =
|
||||
|
||||
Reference in New Issue
Block a user