improve performance

This commit is contained in:
Pascal Seitz
2022-03-14 20:28:08 +08:00
parent 564fa38085
commit 1aa88b0c51
5 changed files with 99 additions and 93 deletions

View File

@@ -12,8 +12,8 @@ use serde::{Deserialize, Serialize};
use super::bucket::generate_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateHistogramBucketEntry, IntermediateMetricResult, IntermediateRangeBucketEntry,
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
IntermediateMetricResult, IntermediateRangeBucketEntry,
};
use super::metric::{SingleMetricResult, Stats};
use super::Key;
@@ -25,9 +25,16 @@ pub struct AggregationResults(pub HashMap<String, AggregationResult>);
impl From<IntermediateAggregationResults> for AggregationResults {
fn from(tree: IntermediateAggregationResults) -> Self {
Self(
tree.0
tree.buckets
.unwrap_or_default()
.into_iter()
.map(|(key, agg)| (key, agg.into()))
.map(|(key, bucket)| (key, AggregationResult::BucketResult(bucket.into())))
.chain(
tree.metrics
.unwrap_or_default()
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
)
.collect(),
)
}
@@ -42,18 +49,6 @@ pub enum AggregationResult {
/// Metric result variant.
MetricResult(MetricResult),
}
impl From<IntermediateAggregationResult> for AggregationResult {
fn from(tree: IntermediateAggregationResult) -> Self {
match tree {
IntermediateAggregationResult::Bucket(bucket) => {
AggregationResult::BucketResult(bucket.into())
}
IntermediateAggregationResult::Metric(metric) => {
AggregationResult::MetricResult(metric.into())
}
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]

View File

@@ -141,30 +141,26 @@ impl SegmentHistogramCollector {
.filter(|bucket| bucket.doc_count != 0)
.count(),
);
// Below we remove empty buckets for two reasons
// 1. To reduce the size of the intermediate result, which may be passed on the wire.
// 2. To mimic elasticsearch, there are no empty buckets at the start and end.
//
// Empty buckets may be added later again in the final result, depending on the request.
if let Some(sub_aggregations) = self.sub_aggregations {
buckets.extend(
self.buckets
.into_iter()
.zip(sub_aggregations.into_iter())
// Here we remove the empty buckets for two reasons
// 1. To reduce the size of the intermediate result, which may be passed on the wire.
// 2. To mimic elasticsearch, there are no empty buckets at the start and end.
//
// Empty buckets may be added later again in the final result, depending on the request.
.filter(|(bucket, _sub_aggregation)| bucket.doc_count != 0)
.map(|(bucket, sub_aggregation)| (bucket, Some(sub_aggregation)).into()),
.map(|(bucket, sub_aggregation)| (bucket, sub_aggregation).into()),
)
} else {
buckets.extend(
self.buckets
.into_iter()
// Here we remove the empty buckets for two reasons
// 1. To reduce the size of the intermediate result, which may be passed on the wire.
// 2. To mimic elasticsearch, there are no empty buckets at the start and end.
//
// Empty buckets may be added later again in the final result, depending on the request.
.filter(|bucket| bucket.doc_count != 0)
.map(|bucket| (bucket, None).into()),
.map(|bucket| bucket.into()),
);
};

View File

@@ -19,18 +19,26 @@ use super::{Key, MergeFruits, SerializedKey, VecWithNames};
/// Contains the intermediate aggregation result, which is optimized to be merged with other
/// intermediate results.
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateAggregationResults(pub(crate) VecWithNames<IntermediateAggregationResult>);
pub struct IntermediateAggregationResults {
pub(crate) metrics: Option<VecWithNames<IntermediateMetricResult>>,
pub(crate) buckets: Option<VecWithNames<IntermediateBucketResult>>,
}
impl From<SegmentAggregationResultsCollector> for IntermediateAggregationResults {
fn from(tree: SegmentAggregationResultsCollector) -> Self {
let mut data = vec![];
for (key, bucket) in tree.buckets.into_iter() {
data.push((key, IntermediateAggregationResult::Bucket(bucket.into())));
}
for (key, metric) in tree.metrics.into_iter() {
data.push((key, IntermediateAggregationResult::Metric(metric.into())));
}
Self(VecWithNames::from_entries(data))
let metrics = if tree.metrics.is_empty() {
None
} else {
Some(VecWithNames::from_other(tree.metrics))
};
let buckets = if tree.buckets.is_empty() {
None
} else {
Some(VecWithNames::from_other(tree.buckets))
};
Self { metrics, buckets }
}
}
@@ -40,8 +48,18 @@ impl IntermediateAggregationResults {
/// The order of the values need to be the same on both results. This is ensured when the same
/// (key values) are present on the underlying VecWithNames struct.
pub fn merge_fruits(&mut self, other: &IntermediateAggregationResults) {
for (tree_left, tree_right) in self.0.values_mut().zip(other.0.values()) {
tree_left.merge_fruits(tree_right);
if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, &other.buckets) {
for (bucket_left, bucket_right) in buckets_left.values_mut().zip(buckets_right.values())
{
bucket_left.merge_fruits(bucket_right);
}
}
if let (Some(metrics_left), Some(metrics_right)) = (&mut self.metrics, &other.metrics) {
for (metric_left, metric_right) in metrics_left.values_mut().zip(metrics_right.values())
{
metric_left.merge_fruits(metric_right);
}
}
}
}
@@ -55,28 +73,6 @@ pub enum IntermediateAggregationResult {
Metric(IntermediateMetricResult),
}
impl IntermediateAggregationResult {
fn merge_fruits(&mut self, other: &IntermediateAggregationResult) {
match (self, other) {
(
IntermediateAggregationResult::Bucket(res_left),
IntermediateAggregationResult::Bucket(res_right),
) => {
res_left.merge_fruits(res_right);
}
(
IntermediateAggregationResult::Metric(res_left),
IntermediateAggregationResult::Metric(res_right),
) => {
res_left.merge_fruits(res_right);
}
_ => {
panic!("incompatible types in aggregation tree on merge fruits");
}
}
}
}
/// Holds the intermediate data for metric results
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum IntermediateMetricResult {
@@ -196,13 +192,6 @@ impl IntermediateBucketResult {
}
}
// fn merge_sorted_vecs<V: MergeFruits + Clone>(entries_left: &mut Vec<V>, entries_right: &Vec<V>) {
// for el in entries_left
//.iter_mut()
//.merge_join_by(entries_right.iter(), |left, right| left.key.cmp(right.key))
//{}
//}
fn merge_maps<V: MergeFruits + Clone>(
entries_left: &mut FnvHashMap<SerializedKey, V>,
entries_right: &FnvHashMap<SerializedKey, V>,
@@ -232,25 +221,32 @@ pub struct IntermediateHistogramBucketEntry {
pub sub_aggregation: IntermediateAggregationResults,
}
impl From<SegmentHistogramBucketEntry> for IntermediateHistogramBucketEntry {
fn from(entry: SegmentHistogramBucketEntry) -> Self {
IntermediateHistogramBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation: Default::default(),
}
}
}
impl
From<(
SegmentHistogramBucketEntry,
Option<SegmentAggregationResultsCollector>,
SegmentAggregationResultsCollector,
)> for IntermediateHistogramBucketEntry
{
fn from(
entry: (
SegmentHistogramBucketEntry,
Option<SegmentAggregationResultsCollector>,
SegmentAggregationResultsCollector,
),
) -> Self {
IntermediateHistogramBucketEntry {
key: entry.0.key,
doc_count: entry.0.doc_count,
sub_aggregation: entry
.1
.map(|sub_aggregations| sub_aggregations.into())
.unwrap_or_default(),
sub_aggregation: entry.1.into(),
}
}
}
@@ -333,9 +329,12 @@ mod tests {
}
map.insert(
"my_agg_level2".to_string(),
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)),
IntermediateBucketResult::Range(buckets),
);
IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect()))
IntermediateAggregationResults {
buckets: Some(VecWithNames::from_entries(map.into_iter().collect())),
metrics: Default::default(),
}
}
fn get_test_tree(data: &[(String, u64, String, u64)]) -> IntermediateAggregationResults {
@@ -359,9 +358,12 @@ mod tests {
}
map.insert(
"my_agg_level1".to_string(),
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)),
IntermediateBucketResult::Range(buckets),
);
IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect()))
IntermediateAggregationResults {
buckets: Some(VecWithNames::from_entries(map.into_iter().collect())),
metrics: Default::default(),
}
}
#[test]

View File

@@ -176,6 +176,7 @@ pub(crate) struct VecWithNames<T: Clone> {
values: Vec<T>,
keys: Vec<String>,
}
impl<T: Clone> Default for VecWithNames<T> {
fn default() -> Self {
Self {
@@ -198,6 +199,15 @@ impl<T: Clone> From<HashMap<String, T>> for VecWithNames<T> {
}
impl<T: Clone> VecWithNames<T> {
fn from_other<K: Clone + Into<T>>(entries: VecWithNames<K>) -> Self {
let mut values = Vec::with_capacity(entries.len());
values.extend(entries.values.into_iter().map(Into::into));
Self {
keys: entries.keys,
values,
}
}
fn from_entries(mut entries: Vec<(String, T)>) -> Self {
// Sort to ensure order of elements match across multiple instances
entries.sort_by(|left, right| left.0.cmp(&right.0));
@@ -230,6 +240,9 @@ impl<T: Clone> VecWithNames<T> {
fn entries(&self) -> impl Iterator<Item = (&str, &T)> + '_ {
self.keys().zip(self.values.iter())
}
fn len(&self) -> usize {
self.values.len()
}
fn is_empty(&self) -> bool {
self.keys.is_empty()
}
@@ -384,7 +397,7 @@ mod tests {
merge_segments: bool,
use_distributed_collector: bool,
) -> crate::Result<()> {
let index = get_test_index_with_num_docs(merge_segments, 300)?;
let index = get_test_index_with_num_docs(merge_segments, 80)?;
let reader = index.reader()?;
let text_field = reader.searcher().schema().get_field("text").unwrap();
@@ -394,12 +407,12 @@ mod tests {
IndexRecordOption::Basic,
);
assert_eq!(DOC_BLOCK_SIZE, 256);
assert_eq!(DOC_BLOCK_SIZE, 64);
// In the tree we cache Documents of DOC_BLOCK_SIZE, before passing them down as one block.
//
// Build a request so that on the first level we have one full cache, which is then flushed.
// The same cache should have some residue docs at the end, which are flushed (Range 0-266)
// -> 266 docs
// The same cache should have some residue docs at the end, which are flushed (Range 0-70)
// -> 70 docs
//
// The second level should also have some residue docs in the cache that are flushed at the
// end.
@@ -412,13 +425,13 @@ mod tests {
"bucketsL1": {
"range": {
"field": "score",
"ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 266.0f64 }, { "from": 266.0f64 } ]
"ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ]
},
"aggs": {
"bucketsL2": {
"range": {
"field": "score",
"ranges": [ { "to": 100.0f64 }, { "from": 100.0f64, "to": 266.0f64 }, { "from": 266.0f64 } ]
"ranges": [ { "to": 30.0f64 }, { "from": 30.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ]
}
}
}
@@ -426,14 +439,14 @@ mod tests {
"histogram_test":{
"histogram": {
"field": "score",
"interval": 263.0,
"interval": 70.0,
"offset": 3.0,
},
"aggs": {
"bucketsL2": {
"histogram": {
"field": "score",
"interval": 263.0
"interval": 70.0
}
}
}
@@ -463,15 +476,15 @@ mod tests {
res["bucketsL1"]["buckets"][0]["bucketsL2"]["buckets"][0]["doc_count"],
3
);
assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-266");
assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 266 - 3);
assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-70");
assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 70 - 3);
assert_eq!(
res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][0]["doc_count"],
97
27
);
assert_eq!(
res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][1]["doc_count"],
166
40
);
assert_eq!(
res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][2]["doc_count"],
@@ -479,9 +492,9 @@ mod tests {
);
assert_eq!(
res["bucketsL1"]["buckets"][2]["bucketsL2"]["buckets"][2]["doc_count"],
300 - 266
80 - 70
);
assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 300 - 266);
assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 80 - 70);
Ok(())
}

View File

@@ -17,7 +17,7 @@ use super::{Key, VecWithNames};
use crate::aggregation::agg_req::BucketAggregationType;
use crate::DocId;
pub(crate) const DOC_BLOCK_SIZE: usize = 256;
pub(crate) const DOC_BLOCK_SIZE: usize = 64;
pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE];
#[derive(Clone, PartialEq)]