Compare commits

..

3 Commits

Author SHA1 Message Date
cong.xie
c69835dc91 fix format 2026-02-04 09:03:42 -05:00
cong.xie
037f387817 feat(aggregation): expose sketches for percentiles and cardinality
This change extends the multi-step query support to percentiles and
cardinality aggregations by exposing their underlying sketches.

Changes:
- Add CardinalityMetricResult struct with value and HLL sketch
- Update PercentilesMetricResult to include DDSketch
- Update MetricResult::Cardinality to use CardinalityMetricResult
- Update finalization to include sketches in results
- Add tests verifying sketch data is present in results

JSON output changes:

Percentiles:
  Before: { "values": {...} }
  After:  { "values": {...}, "sketch": {...} }

Cardinality:
  Before: { "value": 10.0 }
  After:  { "value": 10.0, "sketch": {...} }

The sketch fields enable downstream systems to merge results across
multiple query steps using the raw sketch data.
2026-02-03 11:56:51 -05:00
cong.xie
06c67b656c feat(aggregation): expose sum and count in Average metric result
This change modifies the Average aggregation to return sum and count
alongside the computed average value, enabling downstream systems to
properly merge results across multiple query steps.

Changes:
- Add AverageMetricResult struct with value, sum, and count fields
- Add sum() and count() getter methods to IntermediateAverage
- Update MetricResult::Average to use AverageMetricResult
- Update finalization to populate sum/count from intermediate result
- Update tests to expect new JSON format

JSON output changes from:
  { "value": 2.5 }
to:
  { "value": 2.5, "sum": 15.0, "count": 6 }

This is a breaking change for JSON consumers expecting the old format.
2026-02-03 10:31:54 -05:00
9 changed files with 218 additions and 34 deletions

View File

@@ -10,7 +10,8 @@ use serde::{Deserialize, Serialize};
use super::bucket::GetDocCount;
use super::metric::{
ExtendedStats, PercentilesMetricResult, SingleMetricResult, Stats, TopHitsMetricResult,
AverageMetricResult, CardinalityMetricResult, ExtendedStats, PercentilesMetricResult,
SingleMetricResult, Stats, TopHitsMetricResult,
};
use super::{AggregationError, Key};
use crate::TantivyError;
@@ -81,8 +82,8 @@ impl AggregationResult {
#[serde(untagged)]
/// MetricResult
pub enum MetricResult {
/// Average metric result.
Average(SingleMetricResult),
/// Average metric result with sum and count for multi-step merging.
Average(AverageMetricResult),
/// Count metric result.
Count(SingleMetricResult),
/// Max metric result.
@@ -99,8 +100,8 @@ pub enum MetricResult {
Percentiles(PercentilesMetricResult),
/// Top hits metric result
TopHits(TopHitsMetricResult),
/// Cardinality metric result
Cardinality(SingleMetricResult),
/// Cardinality metric result with HLL sketch for multi-step merging.
Cardinality(CardinalityMetricResult),
}
impl MetricResult {
@@ -119,7 +120,7 @@ impl MetricResult {
MetricResult::TopHits(_) => Err(TantivyError::AggregationError(
AggregationError::InvalidRequest("top_hits can't be used to order".to_string()),
)),
MetricResult::Cardinality(card) => Ok(card.value),
MetricResult::Cardinality(card) => Ok(card.value), // CardinalityMetricResult.value
}
}
}

View File

@@ -1359,10 +1359,10 @@ fn test_aggregation_on_json_object_mixed_types() {
&serde_json::json!({
"rangeagg": {
"buckets": [
{ "average_in_range": { "value": -20.5 }, "doc_count": 1, "key": "*-3", "to": 3.0 },
{ "average_in_range": { "value": 10.0 }, "doc_count": 1, "from": 3.0, "key": "3-19", "to": 19.0 },
{ "average_in_range": { "value": null }, "doc_count": 0, "from": 19.0, "key": "19-20", "to": 20.0 },
{ "average_in_range": { "value": null }, "doc_count": 0, "from": 20.0, "key": "20-*" }
{ "average_in_range": { "value": -20.5, "sum": -20.5, "count": 1 }, "doc_count": 1, "key": "*-3", "to": 3.0 },
{ "average_in_range": { "value": 10.0, "sum": 10.0, "count": 1 }, "doc_count": 1, "from": 3.0, "key": "3-19", "to": 19.0 },
{ "average_in_range": { "value": null, "sum": 0.0, "count": 0 }, "doc_count": 0, "from": 19.0, "key": "19-20", "to": 20.0 },
{ "average_in_range": { "value": null, "sum": 0.0, "count": 0 }, "doc_count": 0, "from": 20.0, "key": "20-*" }
]
},
"termagg": {

View File

@@ -838,7 +838,7 @@ mod tests {
let expected = json!({
"electronics": {
"doc_count": 2,
"avg_price": { "value": 899.0 } // (999 + 799) / 2
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 } // (999 + 799) / 2
}
});
@@ -868,7 +868,7 @@ mod tests {
let expected = json!({
"furniture": {
"doc_count": 0,
"avg_price": { "value": null }
"avg_price": { "value": null, "sum": 0.0, "count": 0 }
}
});
@@ -904,7 +904,7 @@ mod tests {
let expected = json!({
"electronics": {
"doc_count": 2,
"avg_price": { "value": 899.0 }
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 }
},
"in_stock": {
"doc_count": 3, // apple, samsung, penguin
@@ -1000,7 +1000,7 @@ mod tests {
let expected = json!({
"premium_electronics": {
"doc_count": 1, // Only apple (999) is >= 800 in tantivy's range semantics
"avg_rating": { "value": 4.5 }
"avg_rating": { "value": 4.5, "sum": 4.5, "count": 1 }
}
});
@@ -1032,7 +1032,7 @@ mod tests {
let expected = json!({
"in_stock": {
"doc_count": 3, // apple, samsung, penguin
"avg_price": { "value": 607.67 } // (999 + 799 + 25) / 3 ≈ 607.67
"avg_price": { "value": 607.67, "sum": 1823.0, "count": 3 } // (999 + 799 + 25) / 3 ≈ 607.67
},
"out_of_stock": {
"doc_count": 1, // nike
@@ -1183,7 +1183,7 @@ mod tests {
"doc_count": 4,
"electronics_branch": {
"doc_count": 2,
"avg_price": { "value": 899.0 }
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 }
},
"in_stock_branch": {
"doc_count": 3,
@@ -1259,7 +1259,7 @@ mod tests {
"doc_count": 2, // apple (999), samsung (799)
"electronics": {
"doc_count": 2, // both are electronics
"avg_rating": { "value": 4.35 } // (4.5 + 4.2) / 2
"avg_rating": { "value": 4.35, "sum": 8.7, "count": 2 } // (4.5 + 4.2) / 2
},
"in_stock": {
"doc_count": 2, // both are in stock
@@ -1321,12 +1321,12 @@ mod tests {
{
"key": "samsung",
"doc_count": 1,
"avg_price": { "value": 799.0 }
"avg_price": { "value": 799.0, "sum": 799.0, "count": 1 }
},
{
"key": "apple",
"doc_count": 1,
"avg_price": { "value": 999.0 }
"avg_price": { "value": 999.0, "sum": 999.0, "count": 1 }
}
],
"sum_other_doc_count": 0,
@@ -1370,7 +1370,7 @@ mod tests {
"sum": 1798.0,
"avg": 899.0
},
"rating_avg": { "value": 4.35 },
"rating_avg": { "value": 4.35, "sum": 8.7, "count": 2 },
"count": { "value": 2.0 }
}
});
@@ -1411,7 +1411,7 @@ mod tests {
let expected = json!({
"electronics": {
"doc_count": 0,
"avg_price": { "value": null }
"avg_price": { "value": null, "sum": 0.0, "count": 0 }
}
});
@@ -1698,13 +1698,15 @@ mod tests {
let filter_expected = json!({
"electronics": {
"doc_count": 2,
"avg_price": { "value": 899.0 }
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 }
}
});
let separate_expected = json!({
"result": {
"value": 899.0
"value": 899.0,
"sum": 1798.0,
"count": 2
}
});

View File

@@ -1222,7 +1222,9 @@ mod tests {
res["histogram"]["buckets"][0],
json!({
"avg": {
"value": Value::Null
"value": Value::Null,
"sum": 0.0,
"count": 0
},
"doc_count": 0,
"key": 2.0,

View File

@@ -19,8 +19,9 @@ use super::bucket::{
GetDocCount, Order, OrderTarget, RangeAggregation, TermsAggregation,
};
use super::metric::{
IntermediateAverage, IntermediateCount, IntermediateExtendedStats, IntermediateMax,
IntermediateMin, IntermediateStats, IntermediateSum, PercentilesCollector, TopHitsTopNComputer,
AverageMetricResult, CardinalityMetricResult, IntermediateAverage, IntermediateCount,
IntermediateExtendedStats, IntermediateMax, IntermediateMin, IntermediateStats,
IntermediateSum, PercentilesCollector, TopHitsTopNComputer,
};
use super::segment_agg_result::AggregationLimitsGuard;
use super::{format_date, AggregationError, Key, SerializedKey};
@@ -325,7 +326,11 @@ impl IntermediateMetricResult {
fn into_final_metric_result(self, req: &Aggregation) -> MetricResult {
match self {
IntermediateMetricResult::Average(intermediate_avg) => {
MetricResult::Average(intermediate_avg.finalize().into())
MetricResult::Average(AverageMetricResult {
value: intermediate_avg.finalize(),
sum: intermediate_avg.sum(),
count: intermediate_avg.count(),
})
}
IntermediateMetricResult::Count(intermediate_count) => {
MetricResult::Count(intermediate_count.finalize().into())
@@ -353,7 +358,11 @@ impl IntermediateMetricResult {
MetricResult::TopHits(top_hits.into_final_result())
}
IntermediateMetricResult::Cardinality(cardinality) => {
MetricResult::Cardinality(cardinality.finalize().into())
let value = cardinality.finalize();
MetricResult::Cardinality(CardinalityMetricResult {
value,
sketch: Some(cardinality),
})
}
}
}

View File

@@ -63,6 +63,16 @@ impl IntermediateAverage {
pub fn finalize(&self) -> Option<f64> {
self.stats.finalize().avg
}
/// Returns the sum of all collected values.
pub fn sum(&self) -> f64 {
self.stats.sum
}
/// Returns the count of all collected values.
pub fn count(&self) -> u64 {
self.stats.count
}
}
#[cfg(test)]

View File

@@ -340,7 +340,7 @@ impl PartialEq for CardinalityCollector {
impl CardinalityCollector {
/// Compute the final cardinality estimate.
pub fn finalize(self) -> Option<f64> {
pub fn finalize(&self) -> Option<f64> {
Some(self.sketch.clone().count().trunc())
}

View File

@@ -93,6 +93,41 @@ impl From<Option<f64>> for SingleMetricResult {
}
}
/// Average metric result with intermediate data for merging.
///
/// Unlike [`SingleMetricResult`], this struct includes the raw `sum` and `count`
/// values that can be used for multi-step query merging.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct AverageMetricResult {
/// The computed average value. None if no documents matched.
pub value: Option<f64>,
/// The sum of all values (for multi-step merging).
pub sum: f64,
/// The count of all values (for multi-step merging).
pub count: u64,
}
/// Cardinality metric result with computed value and raw HLL sketch for multi-step merging.
///
/// The `value` field contains the computed cardinality estimate.
/// The `sketch` field contains the serialized HyperLogLog++ sketch that can be used
/// for merging results across multiple query steps.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CardinalityMetricResult {
/// The computed cardinality estimate.
pub value: Option<f64>,
/// The serialized HyperLogLog++ sketch for multi-step merging.
#[serde(skip_serializing_if = "Option::is_none")]
pub sketch: Option<CardinalityCollector>,
}
impl PartialEq for CardinalityMetricResult {
fn eq(&self, other: &Self) -> bool {
// Only compare values, not sketch (sketch comparison is complex)
self.value == other.value
}
}
/// This is the wrapper of percentile entries, which can be vector or hashmap
/// depending on if it's keyed or not.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -111,13 +146,26 @@ pub struct PercentileValuesVecEntry {
value: f64,
}
/// Single-metric aggregations use this common result structure.
/// Percentiles metric result with computed values and raw sketch for multi-step merging.
///
/// Main reason to wrap it in value is to match elasticsearch output structure.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// The `values` field contains the computed percentile values.
/// The `sketch` field contains the serialized DDSketch that can be used for merging
/// results across multiple query steps.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PercentilesMetricResult {
/// The result of the percentile metric.
/// The computed percentile values.
pub values: PercentileValues,
/// The serialized DDSketch for multi-step merging.
/// This is the raw sketch data that can be deserialized and merged with other sketches.
#[serde(skip_serializing_if = "Option::is_none")]
pub sketch: Option<PercentilesCollector>,
}
impl PartialEq for PercentilesMetricResult {
fn eq(&self, other: &Self) -> bool {
// Only compare values, not sketch (sketch comparison is complex)
self.values == other.values
}
}
/// The top_hits metric results entry
@@ -198,4 +246,105 @@ mod tests {
assert_eq!(aggregations_res_json["price_min"]["value"], 0.0);
assert_eq!(aggregations_res_json["price_sum"]["value"], 15.0);
}
#[test]
fn test_average_returns_sum_and_count() {
let mut schema_builder = Schema::builder();
let field_options = NumericOptions::default().set_fast();
let field = schema_builder.add_f64_field("price", field_options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// Add documents with values 0, 1, 2, 3, 4, 5
// sum = 15, count = 6, avg = 2.5
for i in 0..6 {
index_writer
.add_document(doc!(
field => i as f64,
))
.unwrap();
}
index_writer.commit().unwrap();
let aggregations_json = r#"{ "price_avg": { "avg": { "field": "price" } } }"#;
let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
let collector = AggregationCollector::from_aggs(aggregations, Default::default());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let aggregations_res_json = serde_json::to_value(aggregations_res).unwrap();
// Verify all three fields are present and correct
assert_eq!(aggregations_res_json["price_avg"]["value"], 2.5);
assert_eq!(aggregations_res_json["price_avg"]["sum"], 15.0);
assert_eq!(aggregations_res_json["price_avg"]["count"], 6);
}
#[test]
fn test_percentiles_returns_sketch() {
let mut schema_builder = Schema::builder();
let field_options = NumericOptions::default().set_fast();
let field = schema_builder.add_f64_field("latency", field_options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// Add documents with latency values
for i in 0..100 {
index_writer
.add_document(doc!(
field => i as f64,
))
.unwrap();
}
index_writer.commit().unwrap();
let aggregations_json =
r#"{ "latency_percentiles": { "percentiles": { "field": "latency" } } }"#;
let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
let collector = AggregationCollector::from_aggs(aggregations, Default::default());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let aggregations_res_json = serde_json::to_value(aggregations_res).unwrap();
// Verify percentile values are present
assert!(aggregations_res_json["latency_percentiles"]["values"].is_object());
// Verify sketch is present (serialized DDSketch)
assert!(aggregations_res_json["latency_percentiles"]["sketch"].is_object());
}
#[test]
fn test_cardinality_returns_sketch() {
let mut schema_builder = Schema::builder();
let field_options = NumericOptions::default().set_fast();
let field = schema_builder.add_u64_field("user_id", field_options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// Add documents with some duplicate user_ids
for i in 0..50 {
index_writer
.add_document(doc!(
field => (i % 10) as u64, // 10 unique values
))
.unwrap();
}
index_writer.commit().unwrap();
let aggregations_json = r#"{ "unique_users": { "cardinality": { "field": "user_id" } } }"#;
let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
let collector = AggregationCollector::from_aggs(aggregations, Default::default());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let aggregations_res_json = serde_json::to_value(aggregations_res).unwrap();
// Verify cardinality value is present and approximately correct
let cardinality = aggregations_res_json["unique_users"]["value"]
.as_f64()
.unwrap();
assert!(cardinality >= 9.0 && cardinality <= 11.0); // HLL is approximate
// Verify sketch is present (serialized HyperLogLog++)
assert!(aggregations_res_json["unique_users"]["sketch"].is_object());
}
}

View File

@@ -178,6 +178,9 @@ fn format_percentile(percentile: f64) -> String {
impl PercentilesCollector {
/// Convert result into final result. This will query the quantils from the underlying quantil
/// collector.
///
/// The result includes both the computed percentile values and the raw DDSketch
/// for multi-step query merging.
pub fn into_final_result(self, req: &PercentilesAggregationReq) -> PercentilesMetricResult {
let percentiles: &[f64] = req
.percents
@@ -210,7 +213,15 @@ impl PercentilesCollector {
.collect(),
)
};
PercentilesMetricResult { values }
PercentilesMetricResult {
values,
sketch: Some(self),
}
}
/// Returns a reference to the underlying DDSketch.
pub fn sketch(&self) -> &sketches_ddsketch::DDSketch {
&self.sketch
}
fn new() -> Self {