diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml index 1f56ec6b5..1616701c8 100644 --- a/columnar/Cargo.toml +++ b/columnar/Cargo.toml @@ -17,6 +17,7 @@ stacker = { path = "../stacker", package="tantivy-stacker"} sstable = { path = "../sstable", package = "tantivy-sstable" } common = { path = "../common", package = "tantivy-common" } tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" } +serde = "1.0.152" [dev-dependencies] proptest = "1" diff --git a/columnar/src/columnar/column_type.rs b/columnar/src/columnar/column_type.rs index 597d4c265..16da5e36b 100644 --- a/columnar/src/columnar/column_type.rs +++ b/columnar/src/columnar/column_type.rs @@ -1,12 +1,14 @@ use std::fmt::Debug; use std::net::Ipv6Addr; +use serde::{Deserialize, Serialize}; + use crate::value::NumericalType; use crate::InvalidData; /// The column type represents the column type. /// Any changes need to be propagated to `COLUMN_TYPES`. -#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd)] +#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd, Serialize, Deserialize)] #[repr(u8)] pub enum ColumnType { I64 = 0u8, diff --git a/examples/aggregation.rs b/examples/aggregation.rs index d5bbda416..61d4ba2f8 100644 --- a/examples/aggregation.rs +++ b/examples/aggregation.rs @@ -192,7 +192,7 @@ fn main() -> tantivy::Result<()> { // let agg_req: Aggregations = serde_json::from_str(agg_req_str)?; - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); + let collector = AggregationCollector::from_aggs(agg_req, None); let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); let res2: Value = serde_json::to_value(agg_res)?; @@ -239,7 +239,7 @@ fn main() -> tantivy::Result<()> { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); + let collector = AggregationCollector::from_aggs(agg_req, None); // We use the `AllQuery` which will pass all documents to the AggregationCollector. let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); @@ -287,7 +287,7 @@ fn main() -> tantivy::Result<()> { let agg_req: Aggregations = serde_json::from_str(agg_req_str)?; - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); + let collector = AggregationCollector::from_aggs(agg_req, None); let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); let res: Value = serde_json::to_value(agg_res)?; diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 39d5aa402..0d8720605 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::sync::atomic::AtomicU32; -use columnar::{Column, StrColumn}; +use columnar::{Column, ColumnType, StrColumn}; use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation}; use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation}; @@ -13,7 +13,6 @@ use super::metric::{ }; use super::segment_agg_result::BucketCount; use super::VecWithNames; -use crate::schema::Type; use crate::{SegmentReader, TantivyError}; #[derive(Clone, Default)] @@ -41,7 +40,7 @@ pub struct BucketAggregationWithAccessor { /// based on search terms. So eventually this needs to be Option or moved. pub(crate) accessor: Column, pub(crate) str_dict_column: Option, - pub(crate) field_type: Type, + pub(crate) field_type: ColumnType, pub(crate) bucket_agg: BucketAggregationType, pub(crate) sub_aggregation: AggregationsWithAccessor, pub(crate) bucket_count: BucketCount, @@ -94,7 +93,7 @@ impl BucketAggregationWithAccessor { #[derive(Clone)] pub struct MetricAggregationWithAccessor { pub metric: MetricAggregation, - pub field_type: Type, + pub field_type: ColumnType, pub accessor: Column, } @@ -158,22 +157,12 @@ pub(crate) fn get_aggs_with_accessor_and_validate( fn get_ff_reader_and_validate( reader: &SegmentReader, field_name: &str, -) -> crate::Result<(columnar::Column, Type)> { - let field = reader.schema().get_field(field_name)?; - // TODO we should get type metadata from columnar - let field_type = reader - .schema() - .get_field_entry(field) - .field_type() - .value_type(); - // TODO Do validation - +) -> crate::Result<(columnar::Column, ColumnType)> { let ff_fields = reader.fast_fields(); - let ff_field = ff_fields.u64_lenient(field_name)?.ok_or_else(|| { - TantivyError::InvalidArgument(format!( - "No numerical fast field found for field: {}", - field_name - )) - })?; - Ok((ff_field, field_type)) + let ff_field_with_type = ff_fields + .u64_lenient_with_type(field_name)? + .ok_or_else(|| { + TantivyError::InvalidArgument(format!("No fast field found for field: {}", field_name)) + })?; + Ok(ff_field_with_type) } diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index 7122f3bee..6ce7e6749 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -12,7 +12,6 @@ use super::bucket::GetDocCount; use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult}; use super::metric::{SingleMetricResult, Stats}; use super::Key; -use crate::schema::Schema; use crate::TantivyError; #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] @@ -154,12 +153,9 @@ pub enum BucketResult { } impl BucketResult { - pub(crate) fn empty_from_req( - req: &BucketAggregationInternal, - schema: &Schema, - ) -> crate::Result { + pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result { let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg); - empty_bucket.into_final_bucket_result(req, schema) + empty_bucket.into_final_bucket_result(req) } } diff --git a/src/aggregation/agg_tests.rs b/src/aggregation/agg_tests.rs new file mode 100644 index 000000000..ab05b1fdd --- /dev/null +++ b/src/aggregation/agg_tests.rs @@ -0,0 +1,1128 @@ +#[cfg(test)] +mod tests { + + use serde_json::Value; + + use crate::aggregation::agg_req::{ + get_term_dict_field_names, Aggregation, Aggregations, BucketAggregation, + BucketAggregationType, MetricAggregation, + }; + use crate::aggregation::agg_result::AggregationResults; + use crate::aggregation::bucket::{RangeAggregation, TermsAggregation}; + use crate::aggregation::buf_collector::DOC_BLOCK_SIZE; + use crate::aggregation::collector::AggregationCollector; + use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults; + use crate::aggregation::metric::AverageAggregation; + use crate::aggregation::tests::{ + get_test_index_2_segments, get_test_index_from_values_and_terms, + }; + use crate::aggregation::DistributedAggregationCollector; + use crate::query::{AllQuery, TermQuery}; + use crate::schema::IndexRecordOption; + use crate::Term; + + fn get_avg_req(field_name: &str) -> Aggregation { + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name(field_name.to_string()), + )) + } + + // *** EVERY BUCKET-TYPE SHOULD BE TESTED HERE *** + fn test_aggregation_flushing( + merge_segments: bool, + use_distributed_collector: bool, + ) -> crate::Result<()> { + let mut values_and_terms = (0..80) + .map(|val| vec![(val as f64, "terma".to_string())]) + .collect::>(); + values_and_terms.last_mut().unwrap()[0].1 = "termb".to_string(); + let index = get_test_index_from_values_and_terms(merge_segments, &values_and_terms)?; + + let reader = index.reader()?; + + assert_eq!(DOC_BLOCK_SIZE, 64); + // In the tree we cache Documents of DOC_BLOCK_SIZE, before passing them down as one block. + // + // Build a request so that on the first level we have one full cache, which is then flushed. + // The same cache should have some residue docs at the end, which are flushed (Range 0-70) + // -> 70 docs + // + // The second level should also have some residue docs in the cache that are flushed at the + // end. + // + // A second bucket on the first level should have the cache unfilled + + // let elasticsearch_compatible_json_req = r#" + let elasticsearch_compatible_json = json!( + { + "bucketsL1": { + "range": { + "field": "score", + "ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] + }, + "aggs": { + "bucketsL2": { + "range": { + "field": "score", + "ranges": [ { "to": 30.0f64 }, { "from": 30.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] + } + } + } + }, + "histogram_test":{ + "histogram": { + "field": "score", + "interval": 70.0, + "offset": 3.0 + }, + "aggs": { + "bucketsL2": { + "histogram": { + "field": "score", + "interval": 70.0 + } + } + } + }, + "term_agg_test":{ + "terms": { + "field": "string_id" + }, + "aggs": { + "bucketsL2": { + "histogram": { + "field": "score", + "interval": 70.0 + } + } + } + } + }); + + let agg_req: Aggregations = + serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap()) + .unwrap(); + + let agg_res: AggregationResults = if use_distributed_collector { + let collector = DistributedAggregationCollector::from_aggs(agg_req.clone(), None); + + let searcher = reader.searcher(); + let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap(); + intermediate_agg_result + .into_final_bucket_result(agg_req) + .unwrap() + } else { + let collector = AggregationCollector::from_aggs(agg_req, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }; + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + + assert_eq!(res["bucketsL1"]["buckets"][0]["doc_count"], 3); + assert_eq!( + res["bucketsL1"]["buckets"][0]["bucketsL2"]["buckets"][0]["doc_count"], + 3 + ); + assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-70"); + assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 70 - 3); + assert_eq!( + res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][0]["doc_count"], + 27 + ); + assert_eq!( + res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][1]["doc_count"], + 40 + ); + assert_eq!( + res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][2]["doc_count"], + 0 + ); + assert_eq!( + res["bucketsL1"]["buckets"][2]["bucketsL2"]["buckets"][2]["doc_count"], + 80 - 70 + ); + assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 80 - 70); + + assert_eq!( + res["term_agg_test"], + json!( + { + "buckets": [ + { + "bucketsL2": { + "buckets": [ + { + "doc_count": 70, + "key": 0.0 + }, + { + "doc_count": 9, + "key": 70.0 + } + ] + }, + "doc_count": 79, + "key": "terma" + }, + { + "bucketsL2": { + "buckets": [ + { + "doc_count": 1, + "key": 70.0 + } + ] + }, + "doc_count": 1, + "key": "termb" + } + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0 + } + ) + ); + + Ok(()) + } + + #[test] + fn test_aggregation_flushing_variants() { + test_aggregation_flushing(false, false).unwrap(); + test_aggregation_flushing(false, true).unwrap(); + test_aggregation_flushing(true, false).unwrap(); + test_aggregation_flushing(true, true).unwrap(); + } + + #[test] + fn test_aggregation_level1() -> crate::Result<()> { + let index = get_test_index_2_segments(true)?; + + let reader = index.reader()?; + let text_field = reader.searcher().schema().get_field("text").unwrap(); + + let term_query = TermQuery::new( + Term::from_field_text(text_field, "cool"), + IndexRecordOption::Basic, + ); + + let agg_req_1: Aggregations = vec![ + ("average_i64".to_string(), get_avg_req("score_i64")), + ("average_f64".to_string(), get_avg_req("score_f64")), + ("average".to_string(), get_avg_req("score")), + ( + "range".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score".to_string(), + ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()], + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + ), + ( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_f64".to_string(), + ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()], + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + ), + ( + "rangei64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_i64".to_string(), + ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()], + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + ), + ] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + assert_eq!(res["average"]["value"], 12.142857142857142); + assert_eq!(res["average_f64"]["value"], 12.214285714285714); + assert_eq!(res["average_i64"]["value"], 12.142857142857142); + assert_eq!( + res["range"]["buckets"], + json!( + [ + { + "key": "*-3", + "doc_count": 1, + "to": 3.0 + }, + { + "key": "3-7", + "doc_count": 2, + "from": 3.0, + "to": 7.0 + }, + { + "key": "7-20", + "doc_count": 3, + "from": 7.0, + "to": 20.0 + }, + { + "key": "20-*", + "doc_count": 1, + "from": 20.0 + } + ]) + ); + + Ok(()) + } + + fn test_aggregation_level2( + merge_segments: bool, + use_distributed_collector: bool, + use_elastic_json_req: bool, + ) -> crate::Result<()> { + let index = get_test_index_2_segments(merge_segments)?; + + let reader = index.reader()?; + let text_field = reader.searcher().schema().get_field("text").unwrap(); + + let term_query = TermQuery::new( + Term::from_field_text(text_field, "cool"), + IndexRecordOption::Basic, + ); + + let query_with_no_hits = TermQuery::new( + Term::from_field_text(text_field, "thistermdoesnotexist"), + IndexRecordOption::Basic, + ); + + let sub_agg_req: Aggregations = vec![ + ("average_in_range".to_string(), get_avg_req("score")), + ( + "term_agg".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Terms(TermsAggregation { + field: "text".to_string(), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + ), + ] + .into_iter() + .collect(); + let agg_req: Aggregations = if use_elastic_json_req { + let elasticsearch_compatible_json_req = r#" +{ + "rangef64": { + "range": { + "field": "score_f64", + "ranges": [ + { "to": 3.0 }, + { "from": 3.0, "to": 7.0 }, + { "from": 7.0, "to": 19.0 }, + { "from": 19.0, "to": 20.0 }, + { "from": 20.0 } + ] + }, + "aggs": { + "average_in_range": { "avg": { "field": "score" } }, + "term_agg": { "terms": { "field": "text" } } + } + }, + "rangei64": { + "range": { + "field": "score_i64", + "ranges": [ + { "to": 3.0 }, + { "from": 3.0, "to": 7.0 }, + { "from": 7.0, "to": 19.0 }, + { "from": 19.0, "to": 20.0 }, + { "from": 20.0 } + ] + }, + "aggs": { + "average_in_range": { "avg": { "field": "score" } }, + "term_agg": { "terms": { "field": "text" } } + } + }, + "average": { + "avg": { "field": "score" } + }, + "range": { + "range": { + "field": "score", + "ranges": [ + { "to": 3.0 }, + { "from": 3.0, "to": 7.0 }, + { "from": 7.0, "to": 19.0 }, + { "from": 19.0, "to": 20.0 }, + { "from": 20.0 } + ] + }, + "aggs": { + "average_in_range": { "avg": { "field": "score" } }, + "term_agg": { "terms": { "field": "text" } } + } + } +} +"#; + let value: Aggregations = + serde_json::from_str(elasticsearch_compatible_json_req).unwrap(); + value + } else { + let agg_req: Aggregations = vec![ + ("average".to_string(), get_avg_req("score")), + ( + "range".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score".to_string(), + ranges: vec![ + (3f64..7f64).into(), + (7f64..19f64).into(), + (19f64..20f64).into(), + ], + ..Default::default() + }), + sub_aggregation: sub_agg_req.clone(), + }), + ), + ( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_f64".to_string(), + ranges: vec![ + (3f64..7f64).into(), + (7f64..19f64).into(), + (19f64..20f64).into(), + ], + ..Default::default() + }), + sub_aggregation: sub_agg_req.clone(), + }), + ), + ( + "rangei64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_i64".to_string(), + ranges: vec![ + (3f64..7f64).into(), + (7f64..19f64).into(), + (19f64..20f64).into(), + ], + ..Default::default() + }), + sub_aggregation: sub_agg_req, + }), + ), + ] + .into_iter() + .collect(); + agg_req + }; + + let field_names = get_term_dict_field_names(&agg_req); + assert_eq!(field_names, vec!["text".to_string()].into_iter().collect()); + + let agg_res: AggregationResults = if use_distributed_collector { + let collector = DistributedAggregationCollector::from_aggs(agg_req.clone(), None); + + let searcher = reader.searcher(); + let res = searcher.search(&term_query, &collector).unwrap(); + // Test de/serialization roundtrip on intermediate_agg_result + let res: IntermediateAggregationResults = + serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); + res.into_final_bucket_result(agg_req.clone()).unwrap() + } else { + let collector = AggregationCollector::from_aggs(agg_req.clone(), None); + + let searcher = reader.searcher(); + searcher.search(&term_query, &collector).unwrap() + }; + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + + assert_eq!(res["range"]["buckets"][1]["key"], "3-7"); + assert_eq!(res["range"]["buckets"][1]["doc_count"], 2u64); + assert_eq!(res["rangef64"]["buckets"][1]["doc_count"], 2u64); + assert_eq!(res["rangei64"]["buckets"][1]["doc_count"], 2u64); + + assert_eq!(res["average"]["value"], 12.142857142857142f64); + assert_eq!(res["range"]["buckets"][2]["key"], "7-19"); + assert_eq!(res["range"]["buckets"][2]["doc_count"], 3u64); + assert_eq!(res["rangef64"]["buckets"][2]["doc_count"], 3u64); + assert_eq!(res["rangei64"]["buckets"][2]["doc_count"], 3u64); + assert_eq!(res["rangei64"]["buckets"][5], serde_json::Value::Null); + + assert_eq!(res["range"]["buckets"][4]["key"], "20-*"); + assert_eq!(res["range"]["buckets"][4]["doc_count"], 1u64); + assert_eq!(res["rangef64"]["buckets"][4]["doc_count"], 1u64); + assert_eq!(res["rangei64"]["buckets"][4]["doc_count"], 1u64); + + assert_eq!(res["range"]["buckets"][3]["key"], "19-20"); + assert_eq!(res["range"]["buckets"][3]["doc_count"], 0u64); + assert_eq!(res["rangef64"]["buckets"][3]["doc_count"], 0u64); + assert_eq!(res["rangei64"]["buckets"][3]["doc_count"], 0u64); + + assert_eq!( + res["range"]["buckets"][3]["average_in_range"]["value"], + serde_json::Value::Null + ); + + assert_eq!( + res["range"]["buckets"][4]["average_in_range"]["value"], + 44.0f64 + ); + assert_eq!( + res["rangef64"]["buckets"][4]["average_in_range"]["value"], + 44.0f64 + ); + assert_eq!( + res["rangei64"]["buckets"][4]["average_in_range"]["value"], + 44.0f64 + ); + + assert_eq!( + res["range"]["7-19"]["average_in_range"]["value"], + res["rangef64"]["7-19"]["average_in_range"]["value"] + ); + assert_eq!( + res["range"]["7-19"]["average_in_range"]["value"], + res["rangei64"]["7-19"]["average_in_range"]["value"] + ); + + // Test empty result set + let collector = AggregationCollector::from_aggs(agg_req, None); + let searcher = reader.searcher(); + searcher.search(&query_with_no_hits, &collector).unwrap(); + + Ok(()) + } + + #[test] + fn test_aggregation_level2_multi_segments() -> crate::Result<()> { + test_aggregation_level2(false, false, false) + } + + #[test] + fn test_aggregation_level2_single_segment() -> crate::Result<()> { + test_aggregation_level2(true, false, false) + } + + #[test] + fn test_aggregation_level2_multi_segments_distributed_collector() -> crate::Result<()> { + test_aggregation_level2(false, true, false) + } + + #[test] + fn test_aggregation_level2_single_segment_distributed_collector() -> crate::Result<()> { + test_aggregation_level2(true, true, false) + } + + #[test] + fn test_aggregation_level2_multi_segments_use_json() -> crate::Result<()> { + test_aggregation_level2(false, false, true) + } + + #[test] + fn test_aggregation_level2_single_segment_use_json() -> crate::Result<()> { + test_aggregation_level2(true, false, true) + } + + #[test] + fn test_aggregation_level2_multi_segments_distributed_collector_use_json() -> crate::Result<()> + { + test_aggregation_level2(false, true, true) + } + + #[test] + fn test_aggregation_level2_single_segment_distributed_collector_use_json() -> crate::Result<()> + { + test_aggregation_level2(true, true, true) + } + + #[test] + fn test_aggregation_invalid_requests() -> crate::Result<()> { + let index = get_test_index_2_segments(false)?; + + let reader = index.reader()?; + + let avg_on_field = |field_name: &str| { + let agg_req_1: Aggregations = vec![( + "average".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name(field_name.to_string()), + )), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + + searcher.search(&AllQuery, &collector).unwrap_err() + }; + + let agg_res = avg_on_field("dummy_text"); + assert_eq!( + format!("{:?}", agg_res), + r#"InvalidArgument("No fast field found for field: dummy_text")"# + ); + + let agg_res = avg_on_field("not_exist_field"); + assert_eq!( + format!("{:?}", agg_res), + r#"InvalidArgument("No fast field found for field: not_exist_field")"# + ); + + let agg_res = avg_on_field("ip_addr"); + assert_eq!( + format!("{:?}", agg_res), + r#"InvalidArgument("No fast field found for field: ip_addr")"# + ); + + Ok(()) + } + + #[cfg(all(test, feature = "unstable"))] + mod bench { + + use rand::prelude::SliceRandom; + use rand::{thread_rng, Rng}; + use test::{self, Bencher}; + + use super::*; + use crate::aggregation::bucket::{ + CustomOrder, HistogramAggregation, HistogramBounds, Order, OrderTarget, + TermsAggregation, + }; + use crate::aggregation::metric::StatsAggregation; + use crate::query::AllQuery; + use crate::schema::{Schema, TextFieldIndexing, FAST, STRING}; + use crate::Index; + + fn get_test_index_bench(_merge_segments: bool) -> crate::Result { + let mut schema_builder = Schema::builder(); + let text_fieldtype = crate::schema::TextOptions::default() + .set_indexing_options( + TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs), + ) + .set_stored(); + let text_field = schema_builder.add_text_field("text", text_fieldtype); + let text_field_many_terms = + schema_builder.add_text_field("text_many_terms", STRING | FAST); + let text_field_few_terms = + schema_builder.add_text_field("text_few_terms", STRING | FAST); + let score_fieldtype = crate::schema::NumericOptions::default().set_fast(); + let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone()); + let score_field_f64 = + schema_builder.add_f64_field("score_f64", score_fieldtype.clone()); + let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype); + let index = Index::create_from_tempdir(schema_builder.build())?; + let few_terms_data = vec!["INFO", "ERROR", "WARN", "DEBUG"]; + let many_terms_data = (0..150_000) + .map(|num| format!("author{}", num)) + .collect::>(); + { + let mut rng = thread_rng(); + let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?; + // writing the segment + for _ in 0..1_000_000 { + let val: f64 = rng.gen_range(0.0..1_000_000.0); + index_writer.add_document(doc!( + text_field => "cool", + text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(), + text_field_few_terms => few_terms_data.choose(&mut rng).unwrap().to_string(), + score_field => val as u64, + score_field_f64 => val, + score_field_i64 => val as i64, + ))?; + } + index_writer.commit()?; + } + + Ok(index) + } + + #[bench] + fn bench_aggregation_average_u64(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + let text_field = reader.searcher().schema().get_field("text").unwrap(); + + b.iter(|| { + let term_query = TermQuery::new( + Term::from_field_text(text_field, "cool"), + IndexRecordOption::Basic, + ); + + let agg_req_1: Aggregations = vec![( + "average".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score".to_string()), + )), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&term_query, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_stats_f64(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + let text_field = reader.searcher().schema().get_field("text").unwrap(); + + b.iter(|| { + let term_query = TermQuery::new( + Term::from_field_text(text_field, "cool"), + IndexRecordOption::Basic, + ); + + let agg_req_1: Aggregations = vec![( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Stats( + StatsAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&term_query, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_average_f64(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + let text_field = reader.searcher().schema().get_field("text").unwrap(); + + b.iter(|| { + let term_query = TermQuery::new( + Term::from_field_text(text_field, "cool"), + IndexRecordOption::Basic, + ); + + let agg_req_1: Aggregations = vec![( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&term_query, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_average_u64_and_f64(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + let text_field = reader.searcher().schema().get_field("text").unwrap(); + + b.iter(|| { + let term_query = TermQuery::new( + Term::from_field_text(text_field, "cool"), + IndexRecordOption::Basic, + ); + + let agg_req_1: Aggregations = vec![ + ( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + ), + ( + "average".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score".to_string()), + )), + ), + ] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&term_query, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_terms_few(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req: Aggregations = vec![( + "my_texts".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Terms(TermsAggregation { + field: "text_few_terms".to_string(), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_terms_many_with_sub_agg(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let sub_agg_req: Aggregations = vec![( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let agg_req: Aggregations = vec![( + "my_texts".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Terms(TermsAggregation { + field: "text_many_terms".to_string(), + ..Default::default() + }), + sub_aggregation: sub_agg_req, + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_terms_many2(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req: Aggregations = vec![( + "my_texts".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Terms(TermsAggregation { + field: "text_many_terms".to_string(), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_terms_many_order_by_term(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req: Aggregations = vec![( + "my_texts".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Terms(TermsAggregation { + field: "text_many_terms".to_string(), + order: Some(CustomOrder { + order: Order::Desc, + target: OrderTarget::Key, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_range_only(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_f64".to_string(), + ranges: vec![ + (3f64..7000f64).into(), + (7000f64..20000f64).into(), + (20000f64..30000f64).into(), + (30000f64..40000f64).into(), + (40000f64..50000f64).into(), + (50000f64..60000f64).into(), + ], + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_range_with_avg(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let sub_agg_req: Aggregations = vec![( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_f64".to_string(), + ranges: vec![ + (3f64..7000f64).into(), + (7000f64..20000f64).into(), + (20000f64..30000f64).into(), + (30000f64..40000f64).into(), + (40000f64..50000f64).into(), + (50000f64..60000f64).into(), + ], + ..Default::default() + }), + sub_aggregation: sub_agg_req, + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + // hard bounds has a different algorithm, because it actually limits collection range + #[bench] + fn bench_aggregation_histogram_only_hard_bounds(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 100f64, + hard_bounds: Some(HistogramBounds { + min: 1000.0, + max: 300_000.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_histogram_with_avg(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let sub_agg_req: Aggregations = vec![( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 100f64, // 1000 buckets + ..Default::default() + }), + sub_aggregation: sub_agg_req, + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_histogram_only(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 100f64, // 1000 buckets + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&AllQuery, &collector).unwrap() + }); + } + + #[bench] + fn bench_aggregation_avg_and_range_with_avg(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + let text_field = reader.searcher().schema().get_field("text").unwrap(); + + b.iter(|| { + let term_query = TermQuery::new( + Term::from_field_text(text_field, "cool"), + IndexRecordOption::Basic, + ); + + let sub_agg_req_1: Aggregations = vec![( + "average_in_range".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score".to_string()), + )), + )] + .into_iter() + .collect(); + + let agg_req_1: Aggregations = vec![ + ( + "average".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score".to_string()), + )), + ), + ( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "score_f64".to_string(), + ranges: vec![ + (3f64..7000f64).into(), + (7000f64..20000f64).into(), + (20000f64..60000f64).into(), + ], + ..Default::default() + }), + sub_aggregation: sub_agg_req_1, + }), + ), + ] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1, None); + + let searcher = reader.searcher(); + searcher.search(&term_query, &collector).unwrap() + }); + } + } +} diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index b052818d9..adc18fa9e 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -1,7 +1,7 @@ use std::cmp::Ordering; use std::fmt::Display; -use columnar::Column; +use columnar::{Column, ColumnType}; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -17,7 +17,6 @@ use crate::aggregation::segment_agg_result::{ build_segment_agg_collector, SegmentAggregationCollector, }; use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames}; -use crate::schema::{Schema, Type}; use crate::{DocId, TantivyError}; /// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`. @@ -204,7 +203,7 @@ pub struct SegmentHistogramCollector { /// The buckets containing the aggregation data. buckets: Vec, sub_aggregations: Option>>, - field_type: Type, + column_type: ColumnType, interval: f64, offset: f64, min_doc_count: u64, @@ -350,13 +349,16 @@ impl SegmentHistogramCollector { ); }; - Ok(IntermediateBucketResult::Histogram { buckets }) + Ok(IntermediateBucketResult::Histogram { + buckets, + column_type: Some(self.column_type), + }) } pub(crate) fn from_req_and_validate( req: &HistogramAggregation, sub_aggregation: &AggregationsWithAccessor, - field_type: Type, + field_type: ColumnType, accessor: &Column, accessor_idx: usize, ) -> crate::Result { @@ -396,7 +398,7 @@ impl SegmentHistogramCollector { Ok(Self { buckets, - field_type, + column_type: field_type, interval: req.interval, offset: req.offset.unwrap_or(0.0), first_bucket_num, @@ -443,7 +445,7 @@ impl SegmentHistogramCollector { } fn f64_from_fastfield_u64(&self, val: u64) -> f64 { - f64_from_fastfield_u64(val, &self.field_type) + f64_from_fastfield_u64(val, &self.column_type) } } @@ -463,7 +465,6 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( buckets: Vec, histogram_req: &HistogramAggregation, sub_aggregation: &AggregationsInternal, - schema: &Schema, ) -> crate::Result> { // Generate the full list of buckets without gaps. // @@ -504,43 +505,33 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( sub_aggregation: empty_sub_aggregation.clone(), }, }) - .map(|intermediate_bucket| { - intermediate_bucket.into_final_bucket_entry(sub_aggregation, schema) - }) + .map(|intermediate_bucket| intermediate_bucket.into_final_bucket_entry(sub_aggregation)) .collect::>>() } // Convert to BucketEntry pub(crate) fn intermediate_histogram_buckets_to_final_buckets( buckets: Vec, + column_type: Option, histogram_req: &HistogramAggregation, sub_aggregation: &AggregationsInternal, - schema: &Schema, ) -> crate::Result> { let mut buckets = if histogram_req.min_doc_count() == 0 { // With min_doc_count != 0, we may need to add buckets, so that there are no // gaps, since intermediate result does not contain empty buckets (filtered to // reduce serialization size). - intermediate_buckets_to_final_buckets_fill_gaps( - buckets, - histogram_req, - sub_aggregation, - schema, - )? + intermediate_buckets_to_final_buckets_fill_gaps(buckets, histogram_req, sub_aggregation)? } else { buckets .into_iter() .filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count()) - .map(|histogram_bucket| { - histogram_bucket.into_final_bucket_entry(sub_aggregation, schema) - }) + .map(|histogram_bucket| histogram_bucket.into_final_bucket_entry(sub_aggregation)) .collect::>>()? }; // If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339 - let field = schema.get_field(&histogram_req.field)?; - if schema.get_field_entry(field).field_type().is_date() { + if column_type == Some(ColumnType::DateTime) { for bucket in buckets.iter_mut() { if let crate::aggregation::Key::F64(val) = bucket.key { let key_as_string = format_date(val as i64)?; diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index b61e6062f..043a12523 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use std::ops::Range; -use columnar::MonotonicallyMappableToU64; +use columnar::{ColumnType, MonotonicallyMappableToU64}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -16,7 +16,6 @@ use crate::aggregation::segment_agg_result::{ use crate::aggregation::{ f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey, VecWithNames, }; -use crate::schema::Type; use crate::TantivyError; /// Provide user-defined buckets to aggregate on. @@ -127,7 +126,7 @@ pub(crate) struct SegmentRangeAndBucketEntry { pub struct SegmentRangeCollector { /// The buckets containing the aggregation data. buckets: Vec, - field_type: Type, + column_type: ColumnType, pub(crate) accessor_idx: usize, } @@ -179,7 +178,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector { self: Box, agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result { - let field_type = self.field_type; + let field_type = self.column_type; let name = agg_with_accessor.buckets.keys[self.accessor_idx].to_string(); let sub_agg = &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; @@ -196,7 +195,10 @@ impl SegmentAggregationCollector for SegmentRangeCollector { }) .collect::>()?; - let bucket = IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets }); + let bucket = IntermediateBucketResult::Range(IntermediateRangeBucketResult { + buckets, + column_type: Some(self.column_type), + }); let buckets = Some(VecWithNames::from_entries(vec![(name, bucket)])); @@ -257,7 +259,7 @@ impl SegmentRangeCollector { req: &RangeAggregation, sub_aggregation: &AggregationsWithAccessor, bucket_count: &BucketCount, - field_type: Type, + field_type: ColumnType, accessor_idx: usize, ) -> crate::Result { // The range input on the request is f64. @@ -305,7 +307,7 @@ impl SegmentRangeCollector { Ok(SegmentRangeCollector { buckets, - field_type, + column_type: field_type, accessor_idx, }) } @@ -335,7 +337,7 @@ impl SegmentRangeCollector { /// more computational expensive when many documents are hit. fn to_u64_range( range: &RangeAggregationRange, - field_type: &Type, + field_type: &ColumnType, ) -> crate::Result { let start = if let Some(from) = range.from { f64_to_fastfield_u64(from, field_type) @@ -361,7 +363,7 @@ fn to_u64_range( /// beginning and end and filling gaps. fn extend_validate_ranges( buckets: &[RangeAggregationRange], - field_type: &Type, + field_type: &ColumnType, ) -> crate::Result> { let mut converted_buckets = buckets .iter() @@ -403,13 +405,16 @@ fn extend_validate_ranges( Ok(converted_buckets) } -pub(crate) fn range_to_string(range: &Range, field_type: &Type) -> crate::Result { +pub(crate) fn range_to_string( + range: &Range, + field_type: &ColumnType, +) -> crate::Result { // is_start is there for malformed requests, e.g. ig the user passes the range u64::MIN..0.0, // it should be rendered as "*-0" and not "*-*" let to_str = |val: u64, is_start: bool| { if (is_start && val == u64::MIN) || (!is_start && val == u64::MAX) { Ok("*".to_string()) - } else if *field_type == Type::Date { + } else if *field_type == ColumnType::DateTime { let val = i64::from_u64(val); format_date(val) } else { @@ -424,7 +429,7 @@ pub(crate) fn range_to_string(range: &Range, field_type: &Type) -> crate::R )) } -pub(crate) fn range_to_key(range: &Range, field_type: &Type) -> crate::Result { +pub(crate) fn range_to_key(range: &Range, field_type: &ColumnType) -> crate::Result { Ok(Key::Str(range_to_string(range, field_type)?)) } @@ -446,7 +451,7 @@ mod tests { pub fn get_collector_from_ranges( ranges: Vec, - field_type: Type, + field_type: ColumnType, ) -> SegmentRangeCollector { let req = RangeAggregation { field: "dummy".to_string(), @@ -736,7 +741,7 @@ mod tests { #[test] fn bucket_test_extend_range_hole() { let buckets = vec![(10f64..20f64).into(), (30f64..40f64).into()]; - let collector = get_collector_from_ranges(buckets, Type::F64); + let collector = get_collector_from_ranges(buckets, ColumnType::F64); let buckets = collector.buckets; assert_eq!(buckets[0].range.start, u64::MIN); @@ -759,7 +764,7 @@ mod tests { (10f64..20f64).into(), (20f64..f64::MAX).into(), ]; - let collector = get_collector_from_ranges(buckets, Type::F64); + let collector = get_collector_from_ranges(buckets, ColumnType::F64); let buckets = collector.buckets; assert_eq!(buckets[0].range.start, u64::MIN); @@ -774,7 +779,7 @@ mod tests { #[test] fn bucket_range_test_negative_vals() { let buckets = vec![(-10f64..-1f64).into()]; - let collector = get_collector_from_ranges(buckets, Type::F64); + let collector = get_collector_from_ranges(buckets, ColumnType::F64); let buckets = collector.buckets; assert_eq!(&buckets[0].bucket.key.to_string(), "*--10"); @@ -783,7 +788,7 @@ mod tests { #[test] fn bucket_range_test_positive_vals() { let buckets = vec![(0f64..10f64).into()]; - let collector = get_collector_from_ranges(buckets, Type::F64); + let collector = get_collector_from_ranges(buckets, ColumnType::F64); let buckets = collector.buckets; assert_eq!(&buckets[0].bucket.key.to_string(), "*-0"); @@ -793,7 +798,7 @@ mod tests { #[test] fn range_binary_search_test_u64() { let check_ranges = |ranges: Vec| { - let collector = get_collector_from_ranges(ranges, Type::U64); + let collector = get_collector_from_ranges(ranges, ColumnType::U64); let search = |val: u64| collector.get_bucket_pos(val); assert_eq!(search(u64::MIN), 0); @@ -839,7 +844,7 @@ mod tests { fn range_binary_search_test_f64() { let ranges = vec![(10.0..100.0).into()]; - let collector = get_collector_from_ranges(ranges, Type::F64); + let collector = get_collector_from_ranges(ranges, ColumnType::F64); let search = |val: u64| collector.get_bucket_pos(val); assert_eq!(search(u64::MIN), 0); @@ -874,7 +879,7 @@ mod bench { buckets.push((bucket_start..bucket_start + bucket_size as f64).into()) } - get_collector_from_ranges(buckets, Type::U64) + get_collector_from_ranges(buckets, ColumnType::U64) } fn get_rand_docs(total_docs: u64, num_docs_returned: u64) -> Vec { diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index b8887f2c8..852b2053b 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -7,7 +7,6 @@ use super::intermediate_agg_result::IntermediateAggregationResults; use super::segment_agg_result::{build_segment_agg_collector, SegmentAggregationCollector}; use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate; use crate::collector::{Collector, SegmentCollector}; -use crate::schema::Schema; use crate::{SegmentReader, TantivyError}; /// The default max bucket count, before the aggregation fails. @@ -17,7 +16,6 @@ pub const MAX_BUCKET_COUNT: u32 = 65000; /// /// The collector collects all aggregations by the underlying aggregation request. pub struct AggregationCollector { - schema: Schema, agg: Aggregations, max_bucket_count: u32, } @@ -27,9 +25,8 @@ impl AggregationCollector { /// /// Aggregation fails when the total bucket count is higher than max_bucket_count. /// max_bucket_count will default to `MAX_BUCKET_COUNT` (65000) when unset - pub fn from_aggs(agg: Aggregations, max_bucket_count: Option, schema: Schema) -> Self { + pub fn from_aggs(agg: Aggregations, max_bucket_count: Option) -> Self { Self { - schema, agg, max_bucket_count: max_bucket_count.unwrap_or(MAX_BUCKET_COUNT), } @@ -116,7 +113,7 @@ impl Collector for AggregationCollector { segment_fruits: Vec<::Fruit>, ) -> crate::Result { let res = merge_fruits(segment_fruits)?; - res.into_final_bucket_result(self.agg.clone(), &self.schema) + res.into_final_bucket_result(self.agg.clone()) } } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index b972bde2d..5f26d627b 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -4,6 +4,7 @@ use std::cmp::Ordering; +use columnar::ColumnType; use itertools::Itertools; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -24,7 +25,6 @@ use super::metric::{ use super::{format_date, Key, SerializedKey, VecWithNames}; use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry}; use crate::aggregation::bucket::TermsAggregationInternal; -use crate::schema::Schema; /// Contains the intermediate aggregation result, which is optimized to be merged with other /// intermediate results. @@ -38,12 +38,8 @@ pub struct IntermediateAggregationResults { impl IntermediateAggregationResults { /// Convert intermediate result and its aggregation request to the final result. - pub fn into_final_bucket_result( - self, - req: Aggregations, - schema: &Schema, - ) -> crate::Result { - self.into_final_bucket_result_internal(&(req.into()), schema) + pub fn into_final_bucket_result(self, req: Aggregations) -> crate::Result { + self.into_final_bucket_result_internal(&(req.into())) } /// Convert intermediate result and its aggregation request to the final result. @@ -53,7 +49,6 @@ impl IntermediateAggregationResults { pub(crate) fn into_final_bucket_result_internal( self, req: &AggregationsInternal, - schema: &Schema, ) -> crate::Result { // Important assumption: // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the @@ -61,11 +56,11 @@ impl IntermediateAggregationResults { let mut results: FxHashMap = FxHashMap::default(); if let Some(buckets) = self.buckets { - convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets, schema)? + convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets)? } else { // When there are no buckets, we create empty buckets, so that the serialized json // format is constant - add_empty_final_buckets_to_result(&mut results, &req.buckets, schema)? + add_empty_final_buckets_to_result(&mut results, &req.buckets)? }; if let Some(metrics) = self.metrics { @@ -166,12 +161,10 @@ fn add_empty_final_metrics_to_result( fn add_empty_final_buckets_to_result( results: &mut FxHashMap, req_buckets: &VecWithNames, - schema: &Schema, ) -> crate::Result<()> { let requested_buckets = req_buckets.iter(); for (key, req) in requested_buckets { - let empty_bucket = - AggregationResult::BucketResult(BucketResult::empty_from_req(req, schema)?); + let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?); results.insert(key.to_string(), empty_bucket); } Ok(()) @@ -181,13 +174,12 @@ fn convert_and_add_final_buckets_to_result( results: &mut FxHashMap, buckets: VecWithNames, req_buckets: &VecWithNames, - schema: &Schema, ) -> crate::Result<()> { assert_eq!(buckets.len(), req_buckets.len()); let buckets_with_request = buckets.into_iter().zip(req_buckets.values()); for ((key, bucket), req) in buckets_with_request { - let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req, schema)?); + let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req)?); results.insert(key, result); } Ok(()) @@ -282,6 +274,8 @@ pub enum IntermediateBucketResult { /// This is the histogram entry for a bucket, which contains a key, count, and optionally /// sub_aggregations. Histogram { + /// The column_type of the underlying `Column` + column_type: Option, /// The buckets buckets: Vec, }, @@ -293,7 +287,6 @@ impl IntermediateBucketResult { pub(crate) fn into_final_bucket_result( self, req: &BucketAggregationInternal, - schema: &Schema, ) -> crate::Result { match self { IntermediateBucketResult::Range(range_res) => { @@ -303,9 +296,9 @@ impl IntermediateBucketResult { .map(|bucket| { bucket.into_final_bucket_entry( &req.sub_aggregation, - schema, req.as_range() .expect("unexpected aggregation, expected histogram aggregation"), + range_res.column_type, ) }) .collect::>>()?; @@ -332,13 +325,16 @@ impl IntermediateBucketResult { }; Ok(BucketResult::Range { buckets }) } - IntermediateBucketResult::Histogram { buckets } => { + IntermediateBucketResult::Histogram { + column_type, + buckets, + } => { let buckets = intermediate_histogram_buckets_to_final_buckets( buckets, + column_type, req.as_histogram() .expect("unexpected aggregation, expected histogram aggregation"), &req.sub_aggregation, - schema, )?; let buckets = if req.as_histogram().unwrap().keyed { @@ -357,7 +353,6 @@ impl IntermediateBucketResult { req.as_term() .expect("unexpected aggregation, expected term aggregation"), &req.sub_aggregation, - schema, ), } } @@ -366,9 +361,10 @@ impl IntermediateBucketResult { match req { BucketAggregationType::Terms(_) => IntermediateBucketResult::Terms(Default::default()), BucketAggregationType::Range(_) => IntermediateBucketResult::Range(Default::default()), - BucketAggregationType::Histogram(_) => { - IntermediateBucketResult::Histogram { buckets: vec![] } - } + BucketAggregationType::Histogram(_) => IntermediateBucketResult::Histogram { + buckets: vec![], + column_type: None, + }, } } fn merge_fruits(&mut self, other: IntermediateBucketResult) { @@ -433,6 +429,7 @@ impl IntermediateBucketResult { /// Range aggregation including error counts pub struct IntermediateRangeBucketResult { pub(crate) buckets: FxHashMap, + pub(crate) column_type: Option, } #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -448,7 +445,6 @@ impl IntermediateTermBucketResult { self, req: &TermsAggregation, sub_aggregation_req: &AggregationsInternal, - schema: &Schema, ) -> crate::Result { let req = TermsAggregationInternal::from_req(req); let mut buckets: Vec = self @@ -462,7 +458,7 @@ impl IntermediateTermBucketResult { doc_count: entry.doc_count, sub_aggregation: entry .sub_aggregation - .into_final_bucket_result_internal(sub_aggregation_req, schema)?, + .into_final_bucket_result_internal(sub_aggregation_req)?, }) }) .collect::>()?; @@ -567,7 +563,6 @@ impl IntermediateHistogramBucketEntry { pub(crate) fn into_final_bucket_entry( self, req: &AggregationsInternal, - schema: &Schema, ) -> crate::Result { Ok(BucketEntry { key_as_string: None, @@ -575,7 +570,7 @@ impl IntermediateHistogramBucketEntry { doc_count: self.doc_count, sub_aggregation: self .sub_aggregation - .into_final_bucket_result_internal(req, schema)?, + .into_final_bucket_result_internal(req)?, }) } } @@ -612,15 +607,15 @@ impl IntermediateRangeBucketEntry { pub(crate) fn into_final_bucket_entry( self, req: &AggregationsInternal, - schema: &Schema, - range_req: &RangeAggregation, + _range_req: &RangeAggregation, + column_type: Option, ) -> crate::Result { let mut range_bucket_entry = RangeBucketEntry { key: self.key, doc_count: self.doc_count, sub_aggregation: self .sub_aggregation - .into_final_bucket_result_internal(req, schema)?, + .into_final_bucket_result_internal(req)?, to: self.to, from: self.from, to_as_string: None, @@ -629,8 +624,7 @@ impl IntermediateRangeBucketEntry { // If we have a date type on the histogram buckets, we add the `key_as_string` field as // rfc339 - let field = schema.get_field(&range_req.field)?; - if schema.get_field_entry(field).field_type().is_date() { + if column_type == Some(ColumnType::DateTime) { if let Some(val) = range_bucket_entry.to { let key_as_string = format_date(val as i64)?; range_bucket_entry.to_as_string = Some(key_as_string); @@ -701,7 +695,10 @@ mod tests { } map.insert( "my_agg_level2".to_string(), - IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets }), + IntermediateBucketResult::Range(IntermediateRangeBucketResult { + buckets, + column_type: None, + }), ); IntermediateAggregationResults { buckets: Some(VecWithNames::from_entries(map.into_iter().collect())), @@ -731,7 +728,10 @@ mod tests { } map.insert( "my_agg_level1".to_string(), - IntermediateBucketResult::Range(IntermediateRangeBucketResult { buckets }), + IntermediateBucketResult::Range(IntermediateRangeBucketResult { + buckets, + column_type: None, + }), ); IntermediateAggregationResults { buckets: Some(VecWithNames::from_entries(map.into_iter().collect())), diff --git a/src/aggregation/metric/mod.rs b/src/aggregation/metric/mod.rs index a13994209..fd2e6ae23 100644 --- a/src/aggregation/metric/mod.rs +++ b/src/aggregation/metric/mod.rs @@ -81,7 +81,7 @@ mod tests { "price_sum": { "sum": { "field": "price" } } }"#; let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap(); - let collector = AggregationCollector::from_aggs(aggregations, None, index.schema()); + let collector = AggregationCollector::from_aggs(aggregations, None); let reader = index.reader().unwrap(); let searcher = reader.searcher(); let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index af26c7a18..b4b1690c9 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -1,4 +1,4 @@ -use columnar::{Cardinality, Column}; +use columnar::{Cardinality, Column, ColumnType}; use serde::{Deserialize, Serialize}; use super::*; @@ -8,7 +8,6 @@ use crate::aggregation::intermediate_agg_result::{ }; use crate::aggregation::segment_agg_result::SegmentAggregationCollector; use crate::aggregation::{f64_from_fastfield_u64, VecWithNames}; -use crate::schema::Type; use crate::{DocId, TantivyError}; /// A multi-value metric aggregation that computes a collection of statistics on numeric values that @@ -153,7 +152,7 @@ pub(crate) enum SegmentStatsType { #[derive(Clone, Debug, PartialEq)] pub(crate) struct SegmentStatsCollector { - field_type: Type, + field_type: ColumnType, pub(crate) collecting_for: SegmentStatsType, pub(crate) stats: IntermediateStats, pub(crate) accessor_idx: usize, @@ -161,7 +160,7 @@ pub(crate) struct SegmentStatsCollector { impl SegmentStatsCollector { pub fn from_req( - field_type: Type, + field_type: ColumnType, collecting_for: SegmentStatsType, accessor_idx: usize, ) -> Self { @@ -290,7 +289,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let reader = index.reader()?; let searcher = reader.searcher(); @@ -327,7 +326,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let reader = index.reader()?; let searcher = reader.searcher(); @@ -404,7 +403,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index b92c99947..24a02f19c 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -56,10 +56,9 @@ //! use tantivy::query::AllQuery; //! use tantivy::aggregation::agg_result::AggregationResults; //! use tantivy::IndexReader; -//! use tantivy::schema::Schema; //! //! # #[allow(dead_code)] -//! fn aggregate_on_index(reader: &IndexReader, schema: Schema) { +//! fn aggregate_on_index(reader: &IndexReader) { //! let agg_req: Aggregations = vec![ //! ( //! "average".to_string(), @@ -71,7 +70,7 @@ //! .into_iter() //! .collect(); //! -//! let collector = AggregationCollector::from_aggs(agg_req, None, schema); +//! let collector = AggregationCollector::from_aggs(agg_req, None); //! //! let searcher = reader.searcher(); //! let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); @@ -169,17 +168,18 @@ mod segment_agg_result; use std::collections::HashMap; use std::fmt::Display; +#[cfg(test)] +mod agg_tests; + pub use collector::{ AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector, MAX_BUCKET_COUNT, }; -use columnar::MonotonicallyMappableToU64; +use columnar::{ColumnType, MonotonicallyMappableToU64}; pub(crate) use date::format_date; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use crate::schema::Type; - /// Represents an associative array `(key => values)` in a very efficient manner. #[derive(Clone, PartialEq, Serialize, Deserialize)] pub(crate) struct VecWithNames { @@ -285,11 +285,11 @@ impl Display for Key { /// /// # Panics /// Only `u64`, `f64`, `date`, and `i64` are supported. -pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 { +pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &ColumnType) -> f64 { match field_type { - Type::U64 => val as f64, - Type::I64 | Type::Date => i64::from_u64(val) as f64, - Type::F64 => f64::from_u64(val), + ColumnType::U64 => val as f64, + ColumnType::I64 | ColumnType::DateTime => i64::from_u64(val) as f64, + ColumnType::F64 => f64::from_u64(val), _ => { panic!("unexpected type {:?}. This should not happen", field_type) } @@ -305,11 +305,11 @@ pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 { /// A `f64` value of e.g. `2.0` needs to be converted using the same monotonic /// conversion function, so that the value matches the `u64` value stored in the fast /// field. -pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option { +pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &ColumnType) -> Option { match field_type { - Type::U64 => Some(val as u64), - Type::I64 | Type::Date => Some((val as i64).to_u64()), - Type::F64 => Some(val.to_u64()), + ColumnType::U64 => Some(val as u64), + ColumnType::I64 | ColumnType::DateTime => Some((val as i64).to_u64()), + ColumnType::F64 => Some(val.to_u64()), _ => None, } } @@ -318,31 +318,16 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option { mod tests { use std::net::Ipv6Addr; + use columnar::DateTime; use serde_json::Value; use time::OffsetDateTime; - use super::agg_req::{Aggregation, Aggregations, BucketAggregation}; - use super::bucket::RangeAggregation; - use super::collector::AggregationCollector; - use super::metric::AverageAggregation; - use crate::aggregation::agg_req::{ - get_term_dict_field_names, BucketAggregationType, MetricAggregation, - }; - use crate::aggregation::agg_result::AggregationResults; - use crate::aggregation::bucket::TermsAggregation; - use crate::aggregation::buf_collector::DOC_BLOCK_SIZE; - use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults; - use crate::aggregation::DistributedAggregationCollector; + use super::agg_req::Aggregations; + use super::*; use crate::indexer::NoMergePolicy; use crate::query::{AllQuery, TermQuery}; use crate::schema::{IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING}; - use crate::{DateTime, Index, Term}; - - fn get_avg_req(field_name: &str) -> Aggregation { - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name(field_name.to_string()), - )) - } + use crate::{Index, Term}; pub fn get_test_index_with_num_docs( merge_segments: bool, @@ -362,7 +347,7 @@ mod tests { index: &Index, query: Option<(&str, &str)>, ) -> crate::Result { - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); + let collector = AggregationCollector::from_aggs(agg_req, None); let reader = index.reader()?; let searcher = reader.searcher(); @@ -474,175 +459,6 @@ mod tests { Ok(index) } - // *** EVERY BUCKET-TYPE SHOULD BE TESTED HERE *** - fn test_aggregation_flushing( - merge_segments: bool, - use_distributed_collector: bool, - ) -> crate::Result<()> { - let mut values_and_terms = (0..80) - .map(|val| vec![(val as f64, "terma".to_string())]) - .collect::>(); - values_and_terms.last_mut().unwrap()[0].1 = "termb".to_string(); - let index = get_test_index_from_values_and_terms(merge_segments, &values_and_terms)?; - - let reader = index.reader()?; - - assert_eq!(DOC_BLOCK_SIZE, 64); - // In the tree we cache Documents of DOC_BLOCK_SIZE, before passing them down as one block. - // - // Build a request so that on the first level we have one full cache, which is then flushed. - // The same cache should have some residue docs at the end, which are flushed (Range 0-70) - // -> 70 docs - // - // The second level should also have some residue docs in the cache that are flushed at the - // end. - // - // A second bucket on the first level should have the cache unfilled - - // let elasticsearch_compatible_json_req = r#" - let elasticsearch_compatible_json = json!( - { - "bucketsL1": { - "range": { - "field": "score", - "ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] - }, - "aggs": { - "bucketsL2": { - "range": { - "field": "score", - "ranges": [ { "to": 30.0f64 }, { "from": 30.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] - } - } - } - }, - "histogram_test":{ - "histogram": { - "field": "score", - "interval": 70.0, - "offset": 3.0 - }, - "aggs": { - "bucketsL2": { - "histogram": { - "field": "score", - "interval": 70.0 - } - } - } - }, - "term_agg_test":{ - "terms": { - "field": "string_id" - }, - "aggs": { - "bucketsL2": { - "histogram": { - "field": "score", - "interval": 70.0 - } - } - } - } - }); - - let agg_req: Aggregations = - serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap()) - .unwrap(); - - let agg_res: AggregationResults = if use_distributed_collector { - let collector = DistributedAggregationCollector::from_aggs(agg_req.clone(), None); - - let searcher = reader.searcher(); - let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap(); - intermediate_agg_result - .into_final_bucket_result(agg_req, &index.schema()) - .unwrap() - } else { - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }; - - let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; - - assert_eq!(res["bucketsL1"]["buckets"][0]["doc_count"], 3); - assert_eq!( - res["bucketsL1"]["buckets"][0]["bucketsL2"]["buckets"][0]["doc_count"], - 3 - ); - assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-70"); - assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 70 - 3); - assert_eq!( - res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][0]["doc_count"], - 27 - ); - assert_eq!( - res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][1]["doc_count"], - 40 - ); - assert_eq!( - res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][2]["doc_count"], - 0 - ); - assert_eq!( - res["bucketsL1"]["buckets"][2]["bucketsL2"]["buckets"][2]["doc_count"], - 80 - 70 - ); - assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 80 - 70); - - assert_eq!( - res["term_agg_test"], - json!( - { - "buckets": [ - { - "bucketsL2": { - "buckets": [ - { - "doc_count": 70, - "key": 0.0 - }, - { - "doc_count": 9, - "key": 70.0 - } - ] - }, - "doc_count": 79, - "key": "terma" - }, - { - "bucketsL2": { - "buckets": [ - { - "doc_count": 1, - "key": 70.0 - } - ] - }, - "doc_count": 1, - "key": "termb" - } - ], - "doc_count_error_upper_bound": 0, - "sum_other_doc_count": 0 - } - ) - ); - - Ok(()) - } - - #[test] - fn test_aggregation_flushing_variants() { - test_aggregation_flushing(false, false).unwrap(); - test_aggregation_flushing(false, true).unwrap(); - test_aggregation_flushing(true, false).unwrap(); - test_aggregation_flushing(true, true).unwrap(); - } - pub fn get_test_index_2_segments(merge_segments: bool) -> crate::Result { let mut schema_builder = Schema::builder(); let text_fieldtype = crate::schema::TextOptions::default() @@ -755,933 +571,4 @@ mod tests { Ok(index) } - - #[test] - fn test_aggregation_level1() -> crate::Result<()> { - let index = get_test_index_2_segments(true)?; - - let reader = index.reader()?; - let text_field = reader.searcher().schema().get_field("text").unwrap(); - - let term_query = TermQuery::new( - Term::from_field_text(text_field, "cool"), - IndexRecordOption::Basic, - ); - - let agg_req_1: Aggregations = vec![ - ("average_i64".to_string(), get_avg_req("score_i64")), - ("average_f64".to_string(), get_avg_req("score_f64")), - ("average".to_string(), get_avg_req("score")), - ( - "range".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score".to_string(), - ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()], - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - ), - ( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score_f64".to_string(), - ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()], - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - ), - ( - "rangei64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score_i64".to_string(), - ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()], - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - ), - ] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); - - let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; - assert_eq!(res["average"]["value"], 12.142857142857142); - assert_eq!(res["average_f64"]["value"], 12.214285714285714); - assert_eq!(res["average_i64"]["value"], 12.142857142857142); - assert_eq!( - res["range"]["buckets"], - json!( - [ - { - "key": "*-3", - "doc_count": 1, - "to": 3.0 - }, - { - "key": "3-7", - "doc_count": 2, - "from": 3.0, - "to": 7.0 - }, - { - "key": "7-20", - "doc_count": 3, - "from": 7.0, - "to": 20.0 - }, - { - "key": "20-*", - "doc_count": 1, - "from": 20.0 - } - ]) - ); - - Ok(()) - } - - fn test_aggregation_level2( - merge_segments: bool, - use_distributed_collector: bool, - use_elastic_json_req: bool, - ) -> crate::Result<()> { - let index = get_test_index_2_segments(merge_segments)?; - - let reader = index.reader()?; - let text_field = reader.searcher().schema().get_field("text").unwrap(); - - let term_query = TermQuery::new( - Term::from_field_text(text_field, "cool"), - IndexRecordOption::Basic, - ); - - let query_with_no_hits = TermQuery::new( - Term::from_field_text(text_field, "thistermdoesnotexist"), - IndexRecordOption::Basic, - ); - - let sub_agg_req: Aggregations = vec![ - ("average_in_range".to_string(), get_avg_req("score")), - ( - "term_agg".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Terms(TermsAggregation { - field: "text".to_string(), - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - ), - ] - .into_iter() - .collect(); - let agg_req: Aggregations = if use_elastic_json_req { - let elasticsearch_compatible_json_req = r#" -{ - "rangef64": { - "range": { - "field": "score_f64", - "ranges": [ - { "to": 3.0 }, - { "from": 3.0, "to": 7.0 }, - { "from": 7.0, "to": 19.0 }, - { "from": 19.0, "to": 20.0 }, - { "from": 20.0 } - ] - }, - "aggs": { - "average_in_range": { "avg": { "field": "score" } }, - "term_agg": { "terms": { "field": "text" } } - } - }, - "rangei64": { - "range": { - "field": "score_i64", - "ranges": [ - { "to": 3.0 }, - { "from": 3.0, "to": 7.0 }, - { "from": 7.0, "to": 19.0 }, - { "from": 19.0, "to": 20.0 }, - { "from": 20.0 } - ] - }, - "aggs": { - "average_in_range": { "avg": { "field": "score" } }, - "term_agg": { "terms": { "field": "text" } } - } - }, - "average": { - "avg": { "field": "score" } - }, - "range": { - "range": { - "field": "score", - "ranges": [ - { "to": 3.0 }, - { "from": 3.0, "to": 7.0 }, - { "from": 7.0, "to": 19.0 }, - { "from": 19.0, "to": 20.0 }, - { "from": 20.0 } - ] - }, - "aggs": { - "average_in_range": { "avg": { "field": "score" } }, - "term_agg": { "terms": { "field": "text" } } - } - } -} -"#; - let value: Aggregations = - serde_json::from_str(elasticsearch_compatible_json_req).unwrap(); - value - } else { - let agg_req: Aggregations = vec![ - ("average".to_string(), get_avg_req("score")), - ( - "range".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score".to_string(), - ranges: vec![ - (3f64..7f64).into(), - (7f64..19f64).into(), - (19f64..20f64).into(), - ], - ..Default::default() - }), - sub_aggregation: sub_agg_req.clone(), - }), - ), - ( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score_f64".to_string(), - ranges: vec![ - (3f64..7f64).into(), - (7f64..19f64).into(), - (19f64..20f64).into(), - ], - ..Default::default() - }), - sub_aggregation: sub_agg_req.clone(), - }), - ), - ( - "rangei64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score_i64".to_string(), - ranges: vec![ - (3f64..7f64).into(), - (7f64..19f64).into(), - (19f64..20f64).into(), - ], - ..Default::default() - }), - sub_aggregation: sub_agg_req, - }), - ), - ] - .into_iter() - .collect(); - agg_req - }; - - let field_names = get_term_dict_field_names(&agg_req); - assert_eq!(field_names, vec!["text".to_string()].into_iter().collect()); - - let agg_res: AggregationResults = if use_distributed_collector { - let collector = DistributedAggregationCollector::from_aggs(agg_req.clone(), None); - - let searcher = reader.searcher(); - let res = searcher.search(&term_query, &collector).unwrap(); - // Test de/serialization roundtrip on intermediate_agg_result - let res: IntermediateAggregationResults = - serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); - res.into_final_bucket_result(agg_req.clone(), &index.schema()) - .unwrap() - } else { - let collector = AggregationCollector::from_aggs(agg_req.clone(), None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&term_query, &collector).unwrap() - }; - - let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; - - assert_eq!(res["range"]["buckets"][1]["key"], "3-7"); - assert_eq!(res["range"]["buckets"][1]["doc_count"], 2u64); - assert_eq!(res["rangef64"]["buckets"][1]["doc_count"], 2u64); - assert_eq!(res["rangei64"]["buckets"][1]["doc_count"], 2u64); - - assert_eq!(res["average"]["value"], 12.142857142857142f64); - assert_eq!(res["range"]["buckets"][2]["key"], "7-19"); - assert_eq!(res["range"]["buckets"][2]["doc_count"], 3u64); - assert_eq!(res["rangef64"]["buckets"][2]["doc_count"], 3u64); - assert_eq!(res["rangei64"]["buckets"][2]["doc_count"], 3u64); - assert_eq!(res["rangei64"]["buckets"][5], serde_json::Value::Null); - - assert_eq!(res["range"]["buckets"][4]["key"], "20-*"); - assert_eq!(res["range"]["buckets"][4]["doc_count"], 1u64); - assert_eq!(res["rangef64"]["buckets"][4]["doc_count"], 1u64); - assert_eq!(res["rangei64"]["buckets"][4]["doc_count"], 1u64); - - assert_eq!(res["range"]["buckets"][3]["key"], "19-20"); - assert_eq!(res["range"]["buckets"][3]["doc_count"], 0u64); - assert_eq!(res["rangef64"]["buckets"][3]["doc_count"], 0u64); - assert_eq!(res["rangei64"]["buckets"][3]["doc_count"], 0u64); - - assert_eq!( - res["range"]["buckets"][3]["average_in_range"]["value"], - serde_json::Value::Null - ); - - assert_eq!( - res["range"]["buckets"][4]["average_in_range"]["value"], - 44.0f64 - ); - assert_eq!( - res["rangef64"]["buckets"][4]["average_in_range"]["value"], - 44.0f64 - ); - assert_eq!( - res["rangei64"]["buckets"][4]["average_in_range"]["value"], - 44.0f64 - ); - - assert_eq!( - res["range"]["7-19"]["average_in_range"]["value"], - res["rangef64"]["7-19"]["average_in_range"]["value"] - ); - assert_eq!( - res["range"]["7-19"]["average_in_range"]["value"], - res["rangei64"]["7-19"]["average_in_range"]["value"] - ); - - // Test empty result set - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); - let searcher = reader.searcher(); - searcher.search(&query_with_no_hits, &collector).unwrap(); - - Ok(()) - } - - #[test] - fn test_aggregation_level2_multi_segments() -> crate::Result<()> { - test_aggregation_level2(false, false, false) - } - - #[test] - fn test_aggregation_level2_single_segment() -> crate::Result<()> { - test_aggregation_level2(true, false, false) - } - - #[test] - fn test_aggregation_level2_multi_segments_distributed_collector() -> crate::Result<()> { - test_aggregation_level2(false, true, false) - } - - #[test] - fn test_aggregation_level2_single_segment_distributed_collector() -> crate::Result<()> { - test_aggregation_level2(true, true, false) - } - - #[test] - fn test_aggregation_level2_multi_segments_use_json() -> crate::Result<()> { - test_aggregation_level2(false, false, true) - } - - #[test] - fn test_aggregation_level2_single_segment_use_json() -> crate::Result<()> { - test_aggregation_level2(true, false, true) - } - - #[test] - fn test_aggregation_level2_multi_segments_distributed_collector_use_json() -> crate::Result<()> - { - test_aggregation_level2(false, true, true) - } - - #[test] - fn test_aggregation_level2_single_segment_distributed_collector_use_json() -> crate::Result<()> - { - test_aggregation_level2(true, true, true) - } - - #[test] - fn test_aggregation_invalid_requests() -> crate::Result<()> { - let index = get_test_index_2_segments(false)?; - - let reader = index.reader()?; - - let avg_on_field = |field_name: &str| { - let agg_req_1: Aggregations = vec![( - "average".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name(field_name.to_string()), - )), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - - searcher.search(&AllQuery, &collector).unwrap_err() - }; - - let agg_res = avg_on_field("dummy_text"); - assert_eq!( - format!("{:?}", agg_res), - r#"InvalidArgument("No numerical fast field found for field: dummy_text")"# - ); - - let agg_res = avg_on_field("not_exist_field"); - assert_eq!( - format!("{:?}", agg_res), - r#"FieldNotFound("not_exist_field")"# - ); - - let agg_res = avg_on_field("ip_addr"); - assert_eq!( - format!("{:?}", agg_res), - r#"InvalidArgument("No numerical fast field found for field: ip_addr")"# - ); - - Ok(()) - } - - #[cfg(all(test, feature = "unstable"))] - mod bench { - - use rand::prelude::SliceRandom; - use rand::{thread_rng, Rng}; - use test::{self, Bencher}; - - use super::*; - use crate::aggregation::bucket::{ - CustomOrder, HistogramAggregation, HistogramBounds, Order, OrderTarget, - TermsAggregation, - }; - use crate::aggregation::metric::StatsAggregation; - use crate::query::AllQuery; - - fn get_test_index_bench(_merge_segments: bool) -> crate::Result { - let mut schema_builder = Schema::builder(); - let text_fieldtype = crate::schema::TextOptions::default() - .set_indexing_options( - TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs), - ) - .set_stored(); - let text_field = schema_builder.add_text_field("text", text_fieldtype); - let text_field_many_terms = - schema_builder.add_text_field("text_many_terms", STRING | FAST); - let text_field_few_terms = - schema_builder.add_text_field("text_few_terms", STRING | FAST); - let score_fieldtype = crate::schema::NumericOptions::default().set_fast(); - let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone()); - let score_field_f64 = - schema_builder.add_f64_field("score_f64", score_fieldtype.clone()); - let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype); - let index = Index::create_from_tempdir(schema_builder.build())?; - let few_terms_data = vec!["INFO", "ERROR", "WARN", "DEBUG"]; - let many_terms_data = (0..150_000) - .map(|num| format!("author{}", num)) - .collect::>(); - { - let mut rng = thread_rng(); - let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?; - // writing the segment - for _ in 0..1_000_000 { - let val: f64 = rng.gen_range(0.0..1_000_000.0); - index_writer.add_document(doc!( - text_field => "cool", - text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(), - text_field_few_terms => few_terms_data.choose(&mut rng).unwrap().to_string(), - score_field => val as u64, - score_field_f64 => val, - score_field_i64 => val as i64, - ))?; - } - index_writer.commit()?; - } - - Ok(index) - } - - #[bench] - fn bench_aggregation_average_u64(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - let text_field = reader.searcher().schema().get_field("text").unwrap(); - - b.iter(|| { - let term_query = TermQuery::new( - Term::from_field_text(text_field, "cool"), - IndexRecordOption::Basic, - ); - - let agg_req_1: Aggregations = vec![( - "average".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score".to_string()), - )), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&term_query, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_stats_f64(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - let text_field = reader.searcher().schema().get_field("text").unwrap(); - - b.iter(|| { - let term_query = TermQuery::new( - Term::from_field_text(text_field, "cool"), - IndexRecordOption::Basic, - ); - - let agg_req_1: Aggregations = vec![( - "average_f64".to_string(), - Aggregation::Metric(MetricAggregation::Stats( - StatsAggregation::from_field_name("score_f64".to_string()), - )), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&term_query, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_average_f64(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - let text_field = reader.searcher().schema().get_field("text").unwrap(); - - b.iter(|| { - let term_query = TermQuery::new( - Term::from_field_text(text_field, "cool"), - IndexRecordOption::Basic, - ); - - let agg_req_1: Aggregations = vec![( - "average_f64".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score_f64".to_string()), - )), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&term_query, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_average_u64_and_f64(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - let text_field = reader.searcher().schema().get_field("text").unwrap(); - - b.iter(|| { - let term_query = TermQuery::new( - Term::from_field_text(text_field, "cool"), - IndexRecordOption::Basic, - ); - - let agg_req_1: Aggregations = vec![ - ( - "average_f64".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score_f64".to_string()), - )), - ), - ( - "average".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score".to_string()), - )), - ), - ] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&term_query, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_terms_few(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let agg_req: Aggregations = vec![( - "my_texts".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Terms(TermsAggregation { - field: "text_few_terms".to_string(), - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_terms_many_with_sub_agg(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let sub_agg_req: Aggregations = vec![( - "average_f64".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score_f64".to_string()), - )), - )] - .into_iter() - .collect(); - - let agg_req: Aggregations = vec![( - "my_texts".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Terms(TermsAggregation { - field: "text_many_terms".to_string(), - ..Default::default() - }), - sub_aggregation: sub_agg_req, - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_terms_many2(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let agg_req: Aggregations = vec![( - "my_texts".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Terms(TermsAggregation { - field: "text_many_terms".to_string(), - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_terms_many_order_by_term(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let agg_req: Aggregations = vec![( - "my_texts".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Terms(TermsAggregation { - field: "text_many_terms".to_string(), - order: Some(CustomOrder { - order: Order::Desc, - target: OrderTarget::Key, - }), - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_range_only(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let agg_req_1: Aggregations = vec![( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score_f64".to_string(), - ranges: vec![ - (3f64..7000f64).into(), - (7000f64..20000f64).into(), - (20000f64..30000f64).into(), - (30000f64..40000f64).into(), - (40000f64..50000f64).into(), - (50000f64..60000f64).into(), - ], - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_range_with_avg(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let sub_agg_req: Aggregations = vec![( - "average_f64".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score_f64".to_string()), - )), - )] - .into_iter() - .collect(); - - let agg_req_1: Aggregations = vec![( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score_f64".to_string(), - ranges: vec![ - (3f64..7000f64).into(), - (7000f64..20000f64).into(), - (20000f64..30000f64).into(), - (30000f64..40000f64).into(), - (40000f64..50000f64).into(), - (50000f64..60000f64).into(), - ], - ..Default::default() - }), - sub_aggregation: sub_agg_req, - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - // hard bounds has a different algorithm, because it actually limits collection range - #[bench] - fn bench_aggregation_histogram_only_hard_bounds(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let agg_req_1: Aggregations = vec![( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { - field: "score_f64".to_string(), - interval: 100f64, - hard_bounds: Some(HistogramBounds { - min: 1000.0, - max: 300_000.0, - }), - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_histogram_with_avg(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let sub_agg_req: Aggregations = vec![( - "average_f64".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score_f64".to_string()), - )), - )] - .into_iter() - .collect(); - - let agg_req_1: Aggregations = vec![( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { - field: "score_f64".to_string(), - interval: 100f64, // 1000 buckets - ..Default::default() - }), - sub_aggregation: sub_agg_req, - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_histogram_only(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - - b.iter(|| { - let agg_req_1: Aggregations = vec![( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { - field: "score_f64".to_string(), - interval: 100f64, // 1000 buckets - ..Default::default() - }), - sub_aggregation: Default::default(), - }), - )] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&AllQuery, &collector).unwrap() - }); - } - - #[bench] - fn bench_aggregation_avg_and_range_with_avg(b: &mut Bencher) { - let index = get_test_index_bench(false).unwrap(); - let reader = index.reader().unwrap(); - let text_field = reader.searcher().schema().get_field("text").unwrap(); - - b.iter(|| { - let term_query = TermQuery::new( - Term::from_field_text(text_field, "cool"), - IndexRecordOption::Basic, - ); - - let sub_agg_req_1: Aggregations = vec![( - "average_in_range".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score".to_string()), - )), - )] - .into_iter() - .collect(); - - let agg_req_1: Aggregations = vec![ - ( - "average".to_string(), - Aggregation::Metric(MetricAggregation::Average( - AverageAggregation::from_field_name("score".to_string()), - )), - ), - ( - "rangef64".to_string(), - Aggregation::Bucket(BucketAggregation { - bucket_agg: BucketAggregationType::Range(RangeAggregation { - field: "score_f64".to_string(), - ranges: vec![ - (3f64..7000f64).into(), - (7000f64..20000f64).into(), - (20000f64..60000f64).into(), - ], - ..Default::default() - }), - sub_aggregation: sub_agg_req_1, - }), - ), - ] - .into_iter() - .collect(); - - let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema()); - - let searcher = reader.searcher(); - searcher.search(&term_query, &collector).unwrap() - }); - } - } } diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 617f1c76d..97523335f 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -171,6 +171,20 @@ impl FastFieldReaders { Ok(None) } + /// Returns the `u64` column used to represent any `u64`-mapped typed (i64, u64, f64, DateTime). + #[doc(hidden)] + pub fn u64_lenient_with_type( + &self, + field_name: &str, + ) -> crate::Result, ColumnType)>> { + for col in self.columnar.read_columns(field_name)? { + if let Some(col_u64) = col.open_u64_lenient()? { + return Ok(Some((col_u64, col.column_type()))); + } + } + Ok(None) + } + /// Returns the `i64` fast field reader reader associated with `field`. /// /// If `field` is not a i64 fast field, this method returns an Error.