diff --git a/src/aggregation/agg_bench.rs b/src/aggregation/agg_bench.rs index 12a898968..a78e748bd 100644 --- a/src/aggregation/agg_bench.rs +++ b/src/aggregation/agg_bench.rs @@ -40,6 +40,7 @@ mod bench { ) .set_stored(); let text_field = schema_builder.add_text_field("text", text_fieldtype); + let json_field = schema_builder.add_json_field("json", FAST); let text_field_many_terms = schema_builder.add_text_field("text_many_terms", STRING | FAST); let text_field_few_terms = schema_builder.add_text_field("text_few_terms", STRING | FAST); let score_fieldtype = crate::schema::NumericOptions::default().set_fast(); @@ -56,7 +57,7 @@ mod bench { .collect::>(); { let mut rng = StdRng::from_seed([1u8; 32]); - let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?; + let mut index_writer = index.writer_with_num_threads(1, 200_000_000)?; // To make the different test cases comparable we just change one doc to force the // cardinality if cardinality == Cardinality::Optional { @@ -64,6 +65,8 @@ mod bench { } if cardinality == Cardinality::Multivalued { index_writer.add_document(doc!( + json_field => json!({"mixed_type": 10.0}), + json_field => json!({"mixed_type": 10.0}), text_field => "cool", text_field => "cool", text_field_many_terms => "cool", @@ -82,10 +85,18 @@ mod bench { if cardinality == Cardinality::Sparse { doc_with_value /= 20; } + let val_max = 1_000_000.0; for _ in 0..doc_with_value { let val: f64 = rng.gen_range(0.0..1_000_000.0); + let json = if rng.gen_bool(0.1) { + // 10% are numeric values + json!({ "mixed_type": val }) + } else { + json!({"mixed_type": many_terms_data.choose(&mut rng).unwrap().to_string()}) + }; index_writer.add_document(doc!( text_field => "cool", + json_field => json, text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(), text_field_few_terms => few_terms_data.choose(&mut rng).unwrap().to_string(), score_field => val as u64, @@ -303,6 +314,33 @@ mod bench { }); } + bench_all_cardinalities!(bench_aggregation_terms_many_json_mixed_type_with_sub_agg); + + fn bench_aggregation_terms_many_json_mixed_type_with_sub_agg_card( + b: &mut Bencher, + cardinality: Cardinality, + ) { + let index = get_test_index_bench(cardinality).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_texts": { + "terms": { "field": "json.mixed_type" }, + "aggs": { + "average_f64": { "avg": { "field": "score_f64" } } + } + }, + })) + .unwrap(); + + let collector = get_collector(agg_req); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + bench_all_cardinalities!(bench_aggregation_terms_many2); fn bench_aggregation_terms_many2_card(b: &mut Bencher, cardinality: Cardinality) { diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 46116e8cb..329b59078 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -36,6 +36,9 @@ pub struct AggregationWithAccessor { pub(crate) accessor: Column, pub(crate) str_dict_column: Option, pub(crate) field_type: ColumnType, + /// In case there are multiple types of fast fields, e.g. string and numeric. + /// Only used for term aggregations + pub(crate) accessor2: Option<(Column, ColumnType)>, pub(crate) sub_aggregation: AggregationsWithAccessor, pub(crate) limits: ResourceLimitGuard, pub(crate) column_block_accessor: ColumnBlockAccessor, @@ -50,34 +53,37 @@ impl AggregationWithAccessor { limits: AggregationLimits, ) -> crate::Result { let mut str_dict_column = None; + let mut accessor2 = None; use AggregationVariants::*; let (accessor, field_type) = match &agg.agg { Range(RangeAggregation { field: field_name, .. - }) => get_ff_reader_and_validate( - reader, - field_name, - Some(get_numeric_or_date_column_types()), - )?, + }) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?, Histogram(HistogramAggregation { field: field_name, .. - }) => get_ff_reader_and_validate( - reader, - field_name, - Some(get_numeric_or_date_column_types()), - )?, + }) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?, DateHistogram(DateHistogramAggregationReq { field: field_name, .. - }) => get_ff_reader_and_validate( - reader, - field_name, - Some(get_numeric_or_date_column_types()), - )?, + }) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?, Terms(TermsAggregation { field: field_name, .. }) => { str_dict_column = reader.fast_fields().str(field_name)?; - get_ff_reader_and_validate(reader, field_name, None)? + let allowed_column_types = [ + ColumnType::I64, + ColumnType::U64, + ColumnType::F64, + ColumnType::Bytes, + ColumnType::Str, + // ColumnType::Bool Unsupported + // ColumnType::IpAddr Unsupported + // ColumnType::DateTime Unsupported + ]; + let mut columns = + get_all_ff_reader(reader, field_name, Some(&allowed_column_types))?; + let first = columns.pop().unwrap(); + accessor2 = columns.pop(); + first } Average(AverageAggregation { field: field_name }) | Count(CountAggregation { field: field_name }) @@ -85,16 +91,13 @@ impl AggregationWithAccessor { | Min(MinAggregation { field: field_name }) | Stats(StatsAggregation { field: field_name }) | Sum(SumAggregation { field: field_name }) => { - let (accessor, field_type) = get_ff_reader_and_validate( - reader, - field_name, - Some(get_numeric_or_date_column_types()), - )?; + let (accessor, field_type) = + get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?; (accessor, field_type) } Percentiles(percentiles) => { - let (accessor, field_type) = get_ff_reader_and_validate( + let (accessor, field_type) = get_ff_reader( reader, percentiles.field_name(), Some(get_numeric_or_date_column_types()), @@ -105,6 +108,7 @@ impl AggregationWithAccessor { let sub_aggregation = sub_aggregation.clone(); Ok(AggregationWithAccessor { accessor, + accessor2, field_type, sub_aggregation: get_aggs_with_segment_accessor_and_validate( &sub_aggregation, @@ -150,8 +154,8 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate( )) } -/// Get fast field reader with given cardinatility. -fn get_ff_reader_and_validate( +/// Get fast field reader or empty as default. +fn get_ff_reader( reader: &SegmentReader, field_name: &str, allowed_column_types: Option<&[ColumnType]>, @@ -167,3 +171,23 @@ fn get_ff_reader_and_validate( }); Ok(ff_field_with_type) } + +/// Get all fast field reader or empty as default. +/// +/// Is guaranteed to return at least one column. +fn get_all_ff_reader( + reader: &SegmentReader, + field_name: &str, + allowed_column_types: Option<&[ColumnType]>, +) -> crate::Result, ColumnType)>> { + let ff_fields = reader.fast_fields(); + let mut ff_field_with_type = + ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?; + if ff_field_with_type.is_empty() { + ff_field_with_type.push(( + Column::build_empty_column(reader.num_docs()), + ColumnType::U64, + )); + } + Ok(ff_field_with_type) +} diff --git a/src/aggregation/agg_tests.rs b/src/aggregation/agg_tests.rs index 4ce745849..66aff3a4c 100644 --- a/src/aggregation/agg_tests.rs +++ b/src/aggregation/agg_tests.rs @@ -826,10 +826,9 @@ fn test_aggregation_on_json_object_mixed_types() { "buckets": [ { "doc_count": 1, "key": 10.0, "min_price": { "value": 10.0 } }, { "doc_count": 1, "key": -20.5, "min_price": { "value": -20.5 } }, - // TODO red is missing since there is no multi aggregation within one - // segment for multiple types // TODO bool is also not yet handled in aggregation - { "doc_count": 1, "key": "blue", "min_price": { "value": null } } + { "doc_count": 1, "key": "blue", "min_price": { "value": null } }, + { "doc_count": 1, "key": "red", "min_price": { "value": null } }, ], "sum_other_doc_count": 0 } diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 0a419414c..7b3352950 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -224,6 +224,110 @@ impl TermBuckets { } } +/// The composite collector is used, when we have different types under one field, to support a term +/// aggregation on both. +#[derive(Clone, Debug)] +pub struct SegmentTermCollectorComposite { + term_agg1: SegmentTermCollector, // field type 1, e.g. strings + term_agg2: SegmentTermCollector, // field type 2, e.g. u64 + accessor_idx: usize, +} +impl SegmentAggregationCollector for SegmentTermCollectorComposite { + fn add_intermediate_aggregation_result( + self: Box, + agg_with_accessor: &AggregationsWithAccessor, + results: &mut IntermediateAggregationResults, + ) -> crate::Result<()> { + let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string(); + let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx]; + + let bucket = self + .term_agg1 + .into_intermediate_bucket_result(agg_with_accessor)?; + results.push( + name.to_string(), + IntermediateAggregationResult::Bucket(bucket), + )?; + let bucket = self + .term_agg2 + .into_intermediate_bucket_result(agg_with_accessor)?; + results.push(name, IntermediateAggregationResult::Bucket(bucket))?; + + Ok(()) + } + + #[inline] + fn collect( + &mut self, + doc: crate::DocId, + agg_with_accessor: &mut AggregationsWithAccessor, + ) -> crate::Result<()> { + self.term_agg1.collect_block(&[doc], agg_with_accessor)?; + self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); + self.term_agg2.collect_block(&[doc], agg_with_accessor)?; + self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); + Ok(()) + } + + #[inline] + fn collect_block( + &mut self, + docs: &[crate::DocId], + agg_with_accessor: &mut AggregationsWithAccessor, + ) -> crate::Result<()> { + self.term_agg1.collect_block(docs, agg_with_accessor)?; + self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); + self.term_agg2.collect_block(docs, agg_with_accessor)?; + self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); + + Ok(()) + } + + fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { + self.term_agg1.flush(agg_with_accessor)?; + self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); + self.term_agg2.flush(agg_with_accessor)?; + self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); + + Ok(()) + } +} + +impl SegmentTermCollectorComposite { + /// Swaps the accessor and field type with the second accessor and field type. + /// This way we can use the same code for both aggregations. + fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) { + if let Some(accessor) = aggregations.accessor2.as_mut() { + std::mem::swap(&mut accessor.0, &mut aggregations.accessor); + std::mem::swap(&mut accessor.1, &mut aggregations.field_type); + } + } + + pub(crate) fn from_req_and_validate( + req: &TermsAggregation, + sub_aggregations: &mut AggregationsWithAccessor, + field_type: ColumnType, + field_type2: ColumnType, + accessor_idx: usize, + ) -> crate::Result { + Ok(Self { + term_agg1: SegmentTermCollector::from_req_and_validate( + req, + sub_aggregations, + field_type, + accessor_idx, + )?, + term_agg2: SegmentTermCollector::from_req_and_validate( + req, + sub_aggregations, + field_type2, + accessor_idx, + )?, + accessor_idx, + }) + } +} + /// The collector puts values from the fast field into the correct buckets and does a conversion to /// the correct datatype. #[derive(Clone, Debug)] diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 27df21314..2c5e604f6 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -15,6 +15,7 @@ use super::metric::{ SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation, SumAggregation, }; +use crate::aggregation::bucket::SegmentTermCollectorComposite; pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { fn add_intermediate_aggregation_result( @@ -80,12 +81,26 @@ pub(crate) fn build_single_agg_segment_collector( ) -> crate::Result> { use AggregationVariants::*; match &req.agg.agg { - Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate( - terms_req, - &mut req.sub_aggregation, - req.field_type, - accessor_idx, - )?)), + Terms(terms_req) => { + if let Some(acc2) = req.accessor2.as_ref() { + Ok(Box::new( + SegmentTermCollectorComposite::from_req_and_validate( + terms_req, + &mut req.sub_aggregation, + req.field_type, + acc2.1, + accessor_idx, + )?, + )) + } else { + Ok(Box::new(SegmentTermCollector::from_req_and_validate( + terms_req, + &mut req.sub_aggregation, + req.field_type, + accessor_idx, + )?)) + } + } Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate( range_req, &mut req.sub_aggregation, diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 38df104a4..fa3c44038 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -278,6 +278,34 @@ impl FastFieldReaders { Ok(None) } + /// Returns the all `u64` column used to represent any `u64`-mapped typed (String/Bytes term + /// ids, i64, u64, f64, DateTime). + /// + /// In case of JSON, there may be two columns. One for term and one for numerical types. (This + /// may change later to 3 types if JSON handles DateTime) + #[doc(hidden)] + pub fn u64_lenient_for_type_all( + &self, + type_white_list_opt: Option<&[ColumnType]>, + field_name: &str, + ) -> crate::Result, ColumnType)>> { + let mut columns_and_types = Vec::new(); + let Some(resolved_field_name) = self.resolve_field(field_name)? else { + return Ok(columns_and_types); + }; + for col in self.columnar.read_columns(&resolved_field_name)? { + if let Some(type_white_list) = type_white_list_opt { + if !type_white_list.contains(&col.column_type()) { + continue; + } + } + if let Some(col_u64) = col.open_u64_lenient()? { + columns_and_types.push((col_u64, col.column_type())); + } + } + Ok(columns_and_types) + } + /// Returns the `u64` column used to represent any `u64`-mapped typed (i64, u64, f64, DateTime). /// /// Returns Ok(None) for empty columns