mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-23 02:29:57 +00:00
remove schema in aggs (#1888)
* switch to ColumnType, move tests * remove Schema dependency in agg
This commit is contained in:
@@ -17,6 +17,7 @@ stacker = { path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { path = "../common", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
|
||||
serde = "1.0.152"
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1"
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
use std::fmt::Debug;
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::value::NumericalType;
|
||||
use crate::InvalidData;
|
||||
|
||||
/// The column type represents the column type.
|
||||
/// Any changes need to be propagated to `COLUMN_TYPES`.
|
||||
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd)]
|
||||
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
#[repr(u8)]
|
||||
pub enum ColumnType {
|
||||
I64 = 0u8,
|
||||
|
||||
@@ -192,7 +192,7 @@ fn main() -> tantivy::Result<()> {
|
||||
//
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(agg_req_str)?;
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None);
|
||||
|
||||
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
|
||||
let res2: Value = serde_json::to_value(agg_res)?;
|
||||
@@ -239,7 +239,7 @@ fn main() -> tantivy::Result<()> {
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None);
|
||||
// We use the `AllQuery` which will pass all documents to the AggregationCollector.
|
||||
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
|
||||
|
||||
@@ -287,7 +287,7 @@ fn main() -> tantivy::Result<()> {
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(agg_req_str)?;
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None);
|
||||
|
||||
let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
|
||||
let res: Value = serde_json::to_value(agg_res)?;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
|
||||
use columnar::{Column, StrColumn};
|
||||
use columnar::{Column, ColumnType, StrColumn};
|
||||
|
||||
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
|
||||
use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation};
|
||||
@@ -13,7 +13,6 @@ use super::metric::{
|
||||
};
|
||||
use super::segment_agg_result::BucketCount;
|
||||
use super::VecWithNames;
|
||||
use crate::schema::Type;
|
||||
use crate::{SegmentReader, TantivyError};
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
@@ -41,7 +40,7 @@ pub struct BucketAggregationWithAccessor {
|
||||
/// based on search terms. So eventually this needs to be Option or moved.
|
||||
pub(crate) accessor: Column<u64>,
|
||||
pub(crate) str_dict_column: Option<StrColumn>,
|
||||
pub(crate) field_type: Type,
|
||||
pub(crate) field_type: ColumnType,
|
||||
pub(crate) bucket_agg: BucketAggregationType,
|
||||
pub(crate) sub_aggregation: AggregationsWithAccessor,
|
||||
pub(crate) bucket_count: BucketCount,
|
||||
@@ -94,7 +93,7 @@ impl BucketAggregationWithAccessor {
|
||||
#[derive(Clone)]
|
||||
pub struct MetricAggregationWithAccessor {
|
||||
pub metric: MetricAggregation,
|
||||
pub field_type: Type,
|
||||
pub field_type: ColumnType,
|
||||
pub accessor: Column<u64>,
|
||||
}
|
||||
|
||||
@@ -158,22 +157,12 @@ pub(crate) fn get_aggs_with_accessor_and_validate(
|
||||
fn get_ff_reader_and_validate(
|
||||
reader: &SegmentReader,
|
||||
field_name: &str,
|
||||
) -> crate::Result<(columnar::Column<u64>, Type)> {
|
||||
let field = reader.schema().get_field(field_name)?;
|
||||
// TODO we should get type metadata from columnar
|
||||
let field_type = reader
|
||||
.schema()
|
||||
.get_field_entry(field)
|
||||
.field_type()
|
||||
.value_type();
|
||||
// TODO Do validation
|
||||
|
||||
) -> crate::Result<(columnar::Column<u64>, ColumnType)> {
|
||||
let ff_fields = reader.fast_fields();
|
||||
let ff_field = ff_fields.u64_lenient(field_name)?.ok_or_else(|| {
|
||||
TantivyError::InvalidArgument(format!(
|
||||
"No numerical fast field found for field: {}",
|
||||
field_name
|
||||
))
|
||||
})?;
|
||||
Ok((ff_field, field_type))
|
||||
let ff_field_with_type = ff_fields
|
||||
.u64_lenient_with_type(field_name)?
|
||||
.ok_or_else(|| {
|
||||
TantivyError::InvalidArgument(format!("No fast field found for field: {}", field_name))
|
||||
})?;
|
||||
Ok(ff_field_with_type)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ use super::bucket::GetDocCount;
|
||||
use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult};
|
||||
use super::metric::{SingleMetricResult, Stats};
|
||||
use super::Key;
|
||||
use crate::schema::Schema;
|
||||
use crate::TantivyError;
|
||||
|
||||
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
|
||||
@@ -154,12 +153,9 @@ pub enum BucketResult {
|
||||
}
|
||||
|
||||
impl BucketResult {
|
||||
pub(crate) fn empty_from_req(
|
||||
req: &BucketAggregationInternal,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<Self> {
|
||||
pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result<Self> {
|
||||
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
|
||||
empty_bucket.into_final_bucket_result(req, schema)
|
||||
empty_bucket.into_final_bucket_result(req)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
1128
src/aggregation/agg_tests.rs
Normal file
1128
src/aggregation/agg_tests.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,7 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Display;
|
||||
|
||||
use columnar::Column;
|
||||
use columnar::{Column, ColumnType};
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -17,7 +17,6 @@ use crate::aggregation::segment_agg_result::{
|
||||
build_segment_agg_collector, SegmentAggregationCollector,
|
||||
};
|
||||
use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames};
|
||||
use crate::schema::{Schema, Type};
|
||||
use crate::{DocId, TantivyError};
|
||||
|
||||
/// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`.
|
||||
@@ -204,7 +203,7 @@ pub struct SegmentHistogramCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
buckets: Vec<SegmentHistogramBucketEntry>,
|
||||
sub_aggregations: Option<Vec<Box<dyn SegmentAggregationCollector>>>,
|
||||
field_type: Type,
|
||||
column_type: ColumnType,
|
||||
interval: f64,
|
||||
offset: f64,
|
||||
min_doc_count: u64,
|
||||
@@ -350,13 +349,16 @@ impl SegmentHistogramCollector {
|
||||
);
|
||||
};
|
||||
|
||||
Ok(IntermediateBucketResult::Histogram { buckets })
|
||||
Ok(IntermediateBucketResult::Histogram {
|
||||
buckets,
|
||||
column_type: Some(self.column_type),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn from_req_and_validate(
|
||||
req: &HistogramAggregation,
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
field_type: Type,
|
||||
field_type: ColumnType,
|
||||
accessor: &Column<u64>,
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
@@ -396,7 +398,7 @@ impl SegmentHistogramCollector {
|
||||
|
||||
Ok(Self {
|
||||
buckets,
|
||||
field_type,
|
||||
column_type: field_type,
|
||||
interval: req.interval,
|
||||
offset: req.offset.unwrap_or(0.0),
|
||||
first_bucket_num,
|
||||
@@ -443,7 +445,7 @@ impl SegmentHistogramCollector {
|
||||
}
|
||||
|
||||
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
|
||||
f64_from_fastfield_u64(val, &self.field_type)
|
||||
f64_from_fastfield_u64(val, &self.column_type)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,7 +465,6 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
histogram_req: &HistogramAggregation,
|
||||
sub_aggregation: &AggregationsInternal,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<Vec<BucketEntry>> {
|
||||
// Generate the full list of buckets without gaps.
|
||||
//
|
||||
@@ -504,43 +505,33 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
sub_aggregation: empty_sub_aggregation.clone(),
|
||||
},
|
||||
})
|
||||
.map(|intermediate_bucket| {
|
||||
intermediate_bucket.into_final_bucket_entry(sub_aggregation, schema)
|
||||
})
|
||||
.map(|intermediate_bucket| intermediate_bucket.into_final_bucket_entry(sub_aggregation))
|
||||
.collect::<crate::Result<Vec<_>>>()
|
||||
}
|
||||
|
||||
// Convert to BucketEntry
|
||||
pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
column_type: Option<ColumnType>,
|
||||
histogram_req: &HistogramAggregation,
|
||||
sub_aggregation: &AggregationsInternal,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<Vec<BucketEntry>> {
|
||||
let mut buckets = if histogram_req.min_doc_count() == 0 {
|
||||
// With min_doc_count != 0, we may need to add buckets, so that there are no
|
||||
// gaps, since intermediate result does not contain empty buckets (filtered to
|
||||
// reduce serialization size).
|
||||
|
||||
intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
buckets,
|
||||
histogram_req,
|
||||
sub_aggregation,
|
||||
schema,
|
||||
)?
|
||||
intermediate_buckets_to_final_buckets_fill_gaps(buckets, histogram_req, sub_aggregation)?
|
||||
} else {
|
||||
buckets
|
||||
.into_iter()
|
||||
.filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count())
|
||||
.map(|histogram_bucket| {
|
||||
histogram_bucket.into_final_bucket_entry(sub_aggregation, schema)
|
||||
})
|
||||
.map(|histogram_bucket| histogram_bucket.into_final_bucket_entry(sub_aggregation))
|
||||
.collect::<crate::Result<Vec<_>>>()?
|
||||
};
|
||||
|
||||
// If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339
|
||||
let field = schema.get_field(&histogram_req.field)?;
|
||||
if schema.get_field_entry(field).field_type().is_date() {
|
||||
if column_type == Some(ColumnType::DateTime) {
|
||||
for bucket in buckets.iter_mut() {
|
||||
if let crate::aggregation::Key::F64(val) = bucket.key {
|
||||
let key_as_string = format_date(val as i64)?;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Range;
|
||||
|
||||
use columnar::MonotonicallyMappableToU64;
|
||||
use columnar::{ColumnType, MonotonicallyMappableToU64};
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -16,7 +16,6 @@ use crate::aggregation::segment_agg_result::{
|
||||
use crate::aggregation::{
|
||||
f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey, VecWithNames,
|
||||
};
|
||||
use crate::schema::Type;
|
||||
use crate::TantivyError;
|
||||
|
||||
/// Provide user-defined buckets to aggregate on.
|
||||
@@ -127,7 +126,7 @@ pub(crate) struct SegmentRangeAndBucketEntry {
|
||||
pub struct SegmentRangeCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
buckets: Vec<SegmentRangeAndBucketEntry>,
|
||||
field_type: Type,
|
||||
column_type: ColumnType,
|
||||
pub(crate) accessor_idx: usize,
|
||||
}
|
||||
|
||||
@@ -179,7 +178,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
|
||||
self: Box<Self>,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<IntermediateAggregationResults> {
|
||||
let field_type = self.field_type;
|
||||
let field_type = self.column_type;
|
||||
let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string();
|
||||
let sub_agg = &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
|
||||
|
||||
@@ -196,7 +195,10 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
|
||||
})
|
||||
.collect::<crate::Result<_>>()?;
|
||||
|
||||
let bucket = IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets });
|
||||
let bucket = IntermediateBucketResult::Range(IntermediateRangeBucketResult {
|
||||
buckets,
|
||||
column_type: Some(self.column_type),
|
||||
});
|
||||
|
||||
let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)]));
|
||||
|
||||
@@ -257,7 +259,7 @@ impl SegmentRangeCollector {
|
||||
req: &RangeAggregation,
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
bucket_count: &BucketCount,
|
||||
field_type: Type,
|
||||
field_type: ColumnType,
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
// The range input on the request is f64.
|
||||
@@ -305,7 +307,7 @@ impl SegmentRangeCollector {
|
||||
|
||||
Ok(SegmentRangeCollector {
|
||||
buckets,
|
||||
field_type,
|
||||
column_type: field_type,
|
||||
accessor_idx,
|
||||
})
|
||||
}
|
||||
@@ -335,7 +337,7 @@ impl SegmentRangeCollector {
|
||||
/// more computational expensive when many documents are hit.
|
||||
fn to_u64_range(
|
||||
range: &RangeAggregationRange,
|
||||
field_type: &Type,
|
||||
field_type: &ColumnType,
|
||||
) -> crate::Result<InternalRangeAggregationRange> {
|
||||
let start = if let Some(from) = range.from {
|
||||
f64_to_fastfield_u64(from, field_type)
|
||||
@@ -361,7 +363,7 @@ fn to_u64_range(
|
||||
/// beginning and end and filling gaps.
|
||||
fn extend_validate_ranges(
|
||||
buckets: &[RangeAggregationRange],
|
||||
field_type: &Type,
|
||||
field_type: &ColumnType,
|
||||
) -> crate::Result<Vec<InternalRangeAggregationRange>> {
|
||||
let mut converted_buckets = buckets
|
||||
.iter()
|
||||
@@ -403,13 +405,16 @@ fn extend_validate_ranges(
|
||||
Ok(converted_buckets)
|
||||
}
|
||||
|
||||
pub(crate) fn range_to_string(range: &Range<u64>, field_type: &Type) -> crate::Result<String> {
|
||||
pub(crate) fn range_to_string(
|
||||
range: &Range<u64>,
|
||||
field_type: &ColumnType,
|
||||
) -> crate::Result<String> {
|
||||
// is_start is there for malformed requests, e.g. ig the user passes the range u64::MIN..0.0,
|
||||
// it should be rendered as "*-0" and not "*-*"
|
||||
let to_str = |val: u64, is_start: bool| {
|
||||
if (is_start && val == u64::MIN) || (!is_start && val == u64::MAX) {
|
||||
Ok("*".to_string())
|
||||
} else if *field_type == Type::Date {
|
||||
} else if *field_type == ColumnType::DateTime {
|
||||
let val = i64::from_u64(val);
|
||||
format_date(val)
|
||||
} else {
|
||||
@@ -424,7 +429,7 @@ pub(crate) fn range_to_string(range: &Range<u64>, field_type: &Type) -> crate::R
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn range_to_key(range: &Range<u64>, field_type: &Type) -> crate::Result<Key> {
|
||||
pub(crate) fn range_to_key(range: &Range<u64>, field_type: &ColumnType) -> crate::Result<Key> {
|
||||
Ok(Key::Str(range_to_string(range, field_type)?))
|
||||
}
|
||||
|
||||
@@ -446,7 +451,7 @@ mod tests {
|
||||
|
||||
pub fn get_collector_from_ranges(
|
||||
ranges: Vec<RangeAggregationRange>,
|
||||
field_type: Type,
|
||||
field_type: ColumnType,
|
||||
) -> SegmentRangeCollector {
|
||||
let req = RangeAggregation {
|
||||
field: "dummy".to_string(),
|
||||
@@ -736,7 +741,7 @@ mod tests {
|
||||
#[test]
|
||||
fn bucket_test_extend_range_hole() {
|
||||
let buckets = vec![(10f64..20f64).into(), (30f64..40f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(buckets[0].range.start, u64::MIN);
|
||||
@@ -759,7 +764,7 @@ mod tests {
|
||||
(10f64..20f64).into(),
|
||||
(20f64..f64::MAX).into(),
|
||||
];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(buckets[0].range.start, u64::MIN);
|
||||
@@ -774,7 +779,7 @@ mod tests {
|
||||
#[test]
|
||||
fn bucket_range_test_negative_vals() {
|
||||
let buckets = vec![(-10f64..-1f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(&buckets[0].bucket.key.to_string(), "*--10");
|
||||
@@ -783,7 +788,7 @@ mod tests {
|
||||
#[test]
|
||||
fn bucket_range_test_positive_vals() {
|
||||
let buckets = vec![(0f64..10f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
let collector = get_collector_from_ranges(buckets, ColumnType::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(&buckets[0].bucket.key.to_string(), "*-0");
|
||||
@@ -793,7 +798,7 @@ mod tests {
|
||||
#[test]
|
||||
fn range_binary_search_test_u64() {
|
||||
let check_ranges = |ranges: Vec<RangeAggregationRange>| {
|
||||
let collector = get_collector_from_ranges(ranges, Type::U64);
|
||||
let collector = get_collector_from_ranges(ranges, ColumnType::U64);
|
||||
let search = |val: u64| collector.get_bucket_pos(val);
|
||||
|
||||
assert_eq!(search(u64::MIN), 0);
|
||||
@@ -839,7 +844,7 @@ mod tests {
|
||||
fn range_binary_search_test_f64() {
|
||||
let ranges = vec![(10.0..100.0).into()];
|
||||
|
||||
let collector = get_collector_from_ranges(ranges, Type::F64);
|
||||
let collector = get_collector_from_ranges(ranges, ColumnType::F64);
|
||||
let search = |val: u64| collector.get_bucket_pos(val);
|
||||
|
||||
assert_eq!(search(u64::MIN), 0);
|
||||
@@ -874,7 +879,7 @@ mod bench {
|
||||
buckets.push((bucket_start..bucket_start + bucket_size as f64).into())
|
||||
}
|
||||
|
||||
get_collector_from_ranges(buckets, Type::U64)
|
||||
get_collector_from_ranges(buckets, ColumnType::U64)
|
||||
}
|
||||
|
||||
fn get_rand_docs(total_docs: u64, num_docs_returned: u64) -> Vec<u64> {
|
||||
|
||||
@@ -7,7 +7,6 @@ use super::intermediate_agg_result::IntermediateAggregationResults;
|
||||
use super::segment_agg_result::{build_segment_agg_collector, SegmentAggregationCollector};
|
||||
use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate;
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::schema::Schema;
|
||||
use crate::{SegmentReader, TantivyError};
|
||||
|
||||
/// The default max bucket count, before the aggregation fails.
|
||||
@@ -17,7 +16,6 @@ pub const MAX_BUCKET_COUNT: u32 = 65000;
|
||||
///
|
||||
/// The collector collects all aggregations by the underlying aggregation request.
|
||||
pub struct AggregationCollector {
|
||||
schema: Schema,
|
||||
agg: Aggregations,
|
||||
max_bucket_count: u32,
|
||||
}
|
||||
@@ -27,9 +25,8 @@ impl AggregationCollector {
|
||||
///
|
||||
/// Aggregation fails when the total bucket count is higher than max_bucket_count.
|
||||
/// max_bucket_count will default to `MAX_BUCKET_COUNT` (65000) when unset
|
||||
pub fn from_aggs(agg: Aggregations, max_bucket_count: Option<u32>, schema: Schema) -> Self {
|
||||
pub fn from_aggs(agg: Aggregations, max_bucket_count: Option<u32>) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
agg,
|
||||
max_bucket_count: max_bucket_count.unwrap_or(MAX_BUCKET_COUNT),
|
||||
}
|
||||
@@ -116,7 +113,7 @@ impl Collector for AggregationCollector {
|
||||
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
|
||||
) -> crate::Result<Self::Fruit> {
|
||||
let res = merge_fruits(segment_fruits)?;
|
||||
res.into_final_bucket_result(self.agg.clone(), &self.schema)
|
||||
res.into_final_bucket_result(self.agg.clone())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use columnar::ColumnType;
|
||||
use itertools::Itertools;
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -24,7 +25,6 @@ use super::metric::{
|
||||
use super::{format_date, Key, SerializedKey, VecWithNames};
|
||||
use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry};
|
||||
use crate::aggregation::bucket::TermsAggregationInternal;
|
||||
use crate::schema::Schema;
|
||||
|
||||
/// Contains the intermediate aggregation result, which is optimized to be merged with other
|
||||
/// intermediate results.
|
||||
@@ -38,12 +38,8 @@ pub struct IntermediateAggregationResults {
|
||||
|
||||
impl IntermediateAggregationResults {
|
||||
/// Convert intermediate result and its aggregation request to the final result.
|
||||
pub fn into_final_bucket_result(
|
||||
self,
|
||||
req: Aggregations,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<AggregationResults> {
|
||||
self.into_final_bucket_result_internal(&(req.into()), schema)
|
||||
pub fn into_final_bucket_result(self, req: Aggregations) -> crate::Result<AggregationResults> {
|
||||
self.into_final_bucket_result_internal(&(req.into()))
|
||||
}
|
||||
|
||||
/// Convert intermediate result and its aggregation request to the final result.
|
||||
@@ -53,7 +49,6 @@ impl IntermediateAggregationResults {
|
||||
pub(crate) fn into_final_bucket_result_internal(
|
||||
self,
|
||||
req: &AggregationsInternal,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<AggregationResults> {
|
||||
// Important assumption:
|
||||
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
|
||||
@@ -61,11 +56,11 @@ impl IntermediateAggregationResults {
|
||||
let mut results: FxHashMap<String, AggregationResult> = FxHashMap::default();
|
||||
|
||||
if let Some(buckets) = self.buckets {
|
||||
convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets, schema)?
|
||||
convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets)?
|
||||
} else {
|
||||
// When there are no buckets, we create empty buckets, so that the serialized json
|
||||
// format is constant
|
||||
add_empty_final_buckets_to_result(&mut results, &req.buckets, schema)?
|
||||
add_empty_final_buckets_to_result(&mut results, &req.buckets)?
|
||||
};
|
||||
|
||||
if let Some(metrics) = self.metrics {
|
||||
@@ -166,12 +161,10 @@ fn add_empty_final_metrics_to_result(
|
||||
fn add_empty_final_buckets_to_result(
|
||||
results: &mut FxHashMap<String, AggregationResult>,
|
||||
req_buckets: &VecWithNames<BucketAggregationInternal>,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<()> {
|
||||
let requested_buckets = req_buckets.iter();
|
||||
for (key, req) in requested_buckets {
|
||||
let empty_bucket =
|
||||
AggregationResult::BucketResult(BucketResult::empty_from_req(req, schema)?);
|
||||
let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?);
|
||||
results.insert(key.to_string(), empty_bucket);
|
||||
}
|
||||
Ok(())
|
||||
@@ -181,13 +174,12 @@ fn convert_and_add_final_buckets_to_result(
|
||||
results: &mut FxHashMap<String, AggregationResult>,
|
||||
buckets: VecWithNames<IntermediateBucketResult>,
|
||||
req_buckets: &VecWithNames<BucketAggregationInternal>,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<()> {
|
||||
assert_eq!(buckets.len(), req_buckets.len());
|
||||
|
||||
let buckets_with_request = buckets.into_iter().zip(req_buckets.values());
|
||||
for ((key, bucket), req) in buckets_with_request {
|
||||
let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req, schema)?);
|
||||
let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req)?);
|
||||
results.insert(key, result);
|
||||
}
|
||||
Ok(())
|
||||
@@ -282,6 +274,8 @@ pub enum IntermediateBucketResult {
|
||||
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
Histogram {
|
||||
/// The column_type of the underlying `Column`
|
||||
column_type: Option<ColumnType>,
|
||||
/// The buckets
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
},
|
||||
@@ -293,7 +287,6 @@ impl IntermediateBucketResult {
|
||||
pub(crate) fn into_final_bucket_result(
|
||||
self,
|
||||
req: &BucketAggregationInternal,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<BucketResult> {
|
||||
match self {
|
||||
IntermediateBucketResult::Range(range_res) => {
|
||||
@@ -303,9 +296,9 @@ impl IntermediateBucketResult {
|
||||
.map(|bucket| {
|
||||
bucket.into_final_bucket_entry(
|
||||
&req.sub_aggregation,
|
||||
schema,
|
||||
req.as_range()
|
||||
.expect("unexpected aggregation, expected histogram aggregation"),
|
||||
range_res.column_type,
|
||||
)
|
||||
})
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
@@ -332,13 +325,16 @@ impl IntermediateBucketResult {
|
||||
};
|
||||
Ok(BucketResult::Range { buckets })
|
||||
}
|
||||
IntermediateBucketResult::Histogram { buckets } => {
|
||||
IntermediateBucketResult::Histogram {
|
||||
column_type,
|
||||
buckets,
|
||||
} => {
|
||||
let buckets = intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets,
|
||||
column_type,
|
||||
req.as_histogram()
|
||||
.expect("unexpected aggregation, expected histogram aggregation"),
|
||||
&req.sub_aggregation,
|
||||
schema,
|
||||
)?;
|
||||
|
||||
let buckets = if req.as_histogram().unwrap().keyed {
|
||||
@@ -357,7 +353,6 @@ impl IntermediateBucketResult {
|
||||
req.as_term()
|
||||
.expect("unexpected aggregation, expected term aggregation"),
|
||||
&req.sub_aggregation,
|
||||
schema,
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -366,9 +361,10 @@ impl IntermediateBucketResult {
|
||||
match req {
|
||||
BucketAggregationType::Terms(_) => IntermediateBucketResult::Terms(Default::default()),
|
||||
BucketAggregationType::Range(_) => IntermediateBucketResult::Range(Default::default()),
|
||||
BucketAggregationType::Histogram(_) => {
|
||||
IntermediateBucketResult::Histogram { buckets: vec![] }
|
||||
}
|
||||
BucketAggregationType::Histogram(_) => IntermediateBucketResult::Histogram {
|
||||
buckets: vec![],
|
||||
column_type: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
fn merge_fruits(&mut self, other: IntermediateBucketResult) {
|
||||
@@ -433,6 +429,7 @@ impl IntermediateBucketResult {
|
||||
/// Range aggregation including error counts
|
||||
pub struct IntermediateRangeBucketResult {
|
||||
pub(crate) buckets: FxHashMap<SerializedKey, IntermediateRangeBucketEntry>,
|
||||
pub(crate) column_type: Option<ColumnType>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
@@ -448,7 +445,6 @@ impl IntermediateTermBucketResult {
|
||||
self,
|
||||
req: &TermsAggregation,
|
||||
sub_aggregation_req: &AggregationsInternal,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<BucketResult> {
|
||||
let req = TermsAggregationInternal::from_req(req);
|
||||
let mut buckets: Vec<BucketEntry> = self
|
||||
@@ -462,7 +458,7 @@ impl IntermediateTermBucketResult {
|
||||
doc_count: entry.doc_count,
|
||||
sub_aggregation: entry
|
||||
.sub_aggregation
|
||||
.into_final_bucket_result_internal(sub_aggregation_req, schema)?,
|
||||
.into_final_bucket_result_internal(sub_aggregation_req)?,
|
||||
})
|
||||
})
|
||||
.collect::<crate::Result<_>>()?;
|
||||
@@ -567,7 +563,6 @@ impl IntermediateHistogramBucketEntry {
|
||||
pub(crate) fn into_final_bucket_entry(
|
||||
self,
|
||||
req: &AggregationsInternal,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<BucketEntry> {
|
||||
Ok(BucketEntry {
|
||||
key_as_string: None,
|
||||
@@ -575,7 +570,7 @@ impl IntermediateHistogramBucketEntry {
|
||||
doc_count: self.doc_count,
|
||||
sub_aggregation: self
|
||||
.sub_aggregation
|
||||
.into_final_bucket_result_internal(req, schema)?,
|
||||
.into_final_bucket_result_internal(req)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -612,15 +607,15 @@ impl IntermediateRangeBucketEntry {
|
||||
pub(crate) fn into_final_bucket_entry(
|
||||
self,
|
||||
req: &AggregationsInternal,
|
||||
schema: &Schema,
|
||||
range_req: &RangeAggregation,
|
||||
_range_req: &RangeAggregation,
|
||||
column_type: Option<ColumnType>,
|
||||
) -> crate::Result<RangeBucketEntry> {
|
||||
let mut range_bucket_entry = RangeBucketEntry {
|
||||
key: self.key,
|
||||
doc_count: self.doc_count,
|
||||
sub_aggregation: self
|
||||
.sub_aggregation
|
||||
.into_final_bucket_result_internal(req, schema)?,
|
||||
.into_final_bucket_result_internal(req)?,
|
||||
to: self.to,
|
||||
from: self.from,
|
||||
to_as_string: None,
|
||||
@@ -629,8 +624,7 @@ impl IntermediateRangeBucketEntry {
|
||||
|
||||
// If we have a date type on the histogram buckets, we add the `key_as_string` field as
|
||||
// rfc339
|
||||
let field = schema.get_field(&range_req.field)?;
|
||||
if schema.get_field_entry(field).field_type().is_date() {
|
||||
if column_type == Some(ColumnType::DateTime) {
|
||||
if let Some(val) = range_bucket_entry.to {
|
||||
let key_as_string = format_date(val as i64)?;
|
||||
range_bucket_entry.to_as_string = Some(key_as_string);
|
||||
@@ -701,7 +695,10 @@ mod tests {
|
||||
}
|
||||
map.insert(
|
||||
"my_agg_level2".to_string(),
|
||||
IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets }),
|
||||
IntermediateBucketResult::Range(IntermediateRangeBucketResult {
|
||||
buckets,
|
||||
column_type: None,
|
||||
}),
|
||||
);
|
||||
IntermediateAggregationResults {
|
||||
buckets: Some(VecWithNames::from_entries(map.into_iter().collect())),
|
||||
@@ -731,7 +728,10 @@ mod tests {
|
||||
}
|
||||
map.insert(
|
||||
"my_agg_level1".to_string(),
|
||||
IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets }),
|
||||
IntermediateBucketResult::Range(IntermediateRangeBucketResult {
|
||||
buckets,
|
||||
column_type: None,
|
||||
}),
|
||||
);
|
||||
IntermediateAggregationResults {
|
||||
buckets: Some(VecWithNames::from_entries(map.into_iter().collect())),
|
||||
|
||||
@@ -81,7 +81,7 @@ mod tests {
|
||||
"price_sum": { "sum": { "field": "price" } }
|
||||
}"#;
|
||||
let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
|
||||
let collector = AggregationCollector::from_aggs(aggregations, None, index.schema());
|
||||
let collector = AggregationCollector::from_aggs(aggregations, None);
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use columnar::{Cardinality, Column};
|
||||
use columnar::{Cardinality, Column, ColumnType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::*;
|
||||
@@ -8,7 +8,6 @@ use crate::aggregation::intermediate_agg_result::{
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
|
||||
use crate::aggregation::{f64_from_fastfield_u64, VecWithNames};
|
||||
use crate::schema::Type;
|
||||
use crate::{DocId, TantivyError};
|
||||
|
||||
/// A multi-value metric aggregation that computes a collection of statistics on numeric values that
|
||||
@@ -153,7 +152,7 @@ pub(crate) enum SegmentStatsType {
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) struct SegmentStatsCollector {
|
||||
field_type: Type,
|
||||
field_type: ColumnType,
|
||||
pub(crate) collecting_for: SegmentStatsType,
|
||||
pub(crate) stats: IntermediateStats,
|
||||
pub(crate) accessor_idx: usize,
|
||||
@@ -161,7 +160,7 @@ pub(crate) struct SegmentStatsCollector {
|
||||
|
||||
impl SegmentStatsCollector {
|
||||
pub fn from_req(
|
||||
field_type: Type,
|
||||
field_type: ColumnType,
|
||||
collecting_for: SegmentStatsType,
|
||||
accessor_idx: usize,
|
||||
) -> Self {
|
||||
@@ -290,7 +289,7 @@ mod tests {
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1, None);
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
@@ -327,7 +326,7 @@ mod tests {
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1, None);
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
@@ -404,7 +403,7 @@ mod tests {
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1, None);
|
||||
|
||||
let searcher = reader.searcher();
|
||||
let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap();
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -171,6 +171,20 @@ impl FastFieldReaders {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns the `u64` column used to represent any `u64`-mapped typed (i64, u64, f64, DateTime).
|
||||
#[doc(hidden)]
|
||||
pub fn u64_lenient_with_type(
|
||||
&self,
|
||||
field_name: &str,
|
||||
) -> crate::Result<Option<(Column<u64>, ColumnType)>> {
|
||||
for col in self.columnar.read_columns(field_name)? {
|
||||
if let Some(col_u64) = col.open_u64_lenient()? {
|
||||
return Ok(Some((col_u64, col.column_type())));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns the `i64` fast field reader reader associated with `field`.
|
||||
///
|
||||
/// If `field` is not a i64 fast field, this method returns an Error.
|
||||
|
||||
Reference in New Issue
Block a user