From 60f752d3067e4b8118c1ed96da48c98398753842 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 5 Dec 2025 17:19:21 +0800 Subject: [PATCH] feat: run histogram quantile in safe mode for incomplete data (#7297) * initial impl Signed-off-by: Ruihang Xia * sqlness test and fix Signed-off-by: Ruihang Xia * correct sqlness case Signed-off-by: Ruihang Xia * simplification Signed-off-by: Ruihang Xia * refine code and comment Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- .../src/extension_plan/histogram_fold.rs | 653 +++++++++++++++--- .../common/promql/simple_histogram.result | 49 ++ .../common/promql/simple_histogram.sql | 33 + 3 files changed, 652 insertions(+), 83 deletions(-) diff --git a/src/promql/src/extension_plan/histogram_fold.rs b/src/promql/src/extension_plan/histogram_fold.rs index 369754899f..7c657e6c58 100644 --- a/src/promql/src/extension_plan/histogram_fold.rs +++ b/src/promql/src/extension_plan/histogram_fold.rs @@ -13,14 +13,15 @@ // limitations under the License. use std::any::Any; +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::task::Poll; use std::time::Instant; use common_telemetry::warn; -use datafusion::arrow::array::AsArray; -use datafusion::arrow::compute::{self, SortOptions, concat_batches}; +use datafusion::arrow::array::{Array, AsArray, StringArray}; +use datafusion::arrow::compute::{SortOptions, concat_batches}; use datafusion::arrow::datatypes::{DataType, Float64Type, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::stats::Precision; @@ -40,8 +41,8 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::{Column, Expr}; use datatypes::prelude::{ConcreteDataType, DataType as GtDataType}; -use datatypes::value::{OrderedF64, ValueRef}; -use datatypes::vectors::{Helper, MutableVector}; +use datatypes::value::{OrderedF64, Value, ValueRef}; +use datatypes::vectors::{Helper, MutableVector, VectorRef}; use futures::{Stream, StreamExt, ready}; /// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later @@ -358,6 +359,9 @@ impl ExecutionPlan for HistogramFoldExec { input_buffer: vec![], input, output_schema, + input_schema: self.input.schema(), + mode: FoldMode::Optimistic, + safe_group: None, metric: baseline_metric, batch_size, input_buffered_rows: 0, @@ -430,6 +434,12 @@ impl DisplayAs for HistogramFoldExec { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FoldMode { + Optimistic, + Safe, +} + pub struct HistogramFoldStream { // internal states le_column_index: usize, @@ -441,6 +451,9 @@ pub struct HistogramFoldStream { /// Expected output batch size batch_size: usize, output_schema: SchemaRef, + input_schema: SchemaRef, + mode: FoldMode, + safe_group: Option, // buffers input_buffer: Vec, @@ -453,6 +466,13 @@ pub struct HistogramFoldStream { metric: BaselineMetrics, } +#[derive(Debug, Default)] +struct SafeGroup { + tag_values: Vec, + buckets: Vec, + counters: Vec, +} + impl RecordBatchStream for HistogramFoldStream { fn schema(&self) -> SchemaRef { self.output_schema.clone() @@ -478,7 +498,10 @@ impl Stream for HistogramFoldStream { self.metric.elapsed_compute().add_elapsed(timer); break Poll::Ready(Some(result)); } - None => break Poll::Ready(self.take_output_buf()?.map(Ok)), + None => { + self.flush_remaining()?; + break Poll::Ready(self.take_output_buf()?.map(Ok)); + } } }; self.metric.record_poll(poll) @@ -491,22 +514,28 @@ impl HistogramFoldStream { &mut self, input: RecordBatch, ) -> DataFusionResult>> { - let Some(bucket_num) = self.calculate_bucket_num(&input)? else { - return Ok(None); - }; + match self.mode { + FoldMode::Safe => { + self.push_input_buf(input); + self.process_safe_mode_buffer()?; + } + FoldMode::Optimistic => { + self.push_input_buf(input); + let Some(bucket_num) = self.calculate_bucket_num_from_buffer()? else { + return Ok(None); + }; + self.bucket_size = Some(bucket_num); - if self.input_buffered_rows + input.num_rows() < bucket_num { - // not enough rows to fold - self.push_input_buf(input); - return Ok(None); + if self.input_buffered_rows < bucket_num { + // not enough rows to fold + return Ok(None); + } + + self.fold_buf(bucket_num)?; + } } - self.fold_buf(bucket_num, input)?; - if self.output_buffered_rows >= self.batch_size { - return Ok(self.take_output_buf()?.map(Ok)); - } - - Ok(None) + self.maybe_take_output() } /// Generate a group of empty [MutableVector]s from the output schema. @@ -532,55 +561,100 @@ impl HistogramFoldStream { Ok(builders) } - fn calculate_bucket_num(&mut self, batch: &RecordBatch) -> DataFusionResult> { + /// Determines bucket count using buffered batches, concatenating them to + /// detect the first complete bucket that may span batch boundaries. + fn calculate_bucket_num_from_buffer(&mut self) -> DataFusionResult> { if let Some(size) = self.bucket_size { return Ok(Some(size)); } - let inf_pos = self.find_positive_inf(batch)?; - if inf_pos == batch.num_rows() { - // no positive inf found, append to buffer and wait for next batch - self.push_input_buf(batch.clone()); + if self.input_buffer.is_empty() { return Ok(None); } - // else we found the positive inf. - // calculate the bucket size - let bucket_size = inf_pos + self.input_buffered_rows + 1; - Ok(Some(bucket_size)) + let batch_refs: Vec<&RecordBatch> = self.input_buffer.iter().collect(); + let batch = concat_batches(&self.input_schema, batch_refs)?; + self.find_first_complete_bucket(&batch) + } + + fn find_first_complete_bucket(&self, batch: &RecordBatch) -> DataFusionResult> { + if batch.num_rows() == 0 { + return Ok(None); + } + + let vectors = Helper::try_into_vectors(batch.columns()) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + let le_array = batch.column(self.le_column_index).as_string::(); + + let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len()); + self.collect_tag_values(&vectors, 0, &mut tag_values_buf); + let mut group_start = 0usize; + + for row in 0..batch.num_rows() { + if !self.is_same_group(&vectors, row, &tag_values_buf) { + // new group begins + self.collect_tag_values(&vectors, row, &mut tag_values_buf); + group_start = row; + } + + if Self::is_positive_infinity(le_array, row) { + return Ok(Some(row - group_start + 1)); + } + } + + Ok(None) } /// Fold record batches from input buffer and put to output buffer - fn fold_buf(&mut self, bucket_num: usize, input: RecordBatch) -> DataFusionResult<()> { - self.push_input_buf(input); - // TODO(ruihang): this concat is avoidable. - let batch = concat_batches(&self.input.schema(), self.input_buffer.drain(..).as_ref())?; + fn fold_buf(&mut self, bucket_num: usize) -> DataFusionResult<()> { + let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?; let mut remaining_rows = self.input_buffered_rows; let mut cursor = 0; // TODO(LFC): Try to get rid of the Arrow array to vector conversion here. let vectors = Helper::try_into_vectors(batch.columns()) .map_err(|e| DataFusionError::Execution(e.to_string()))?; + let le_array = batch.column(self.le_column_index); + let le_array = le_array.as_string::(); + let field_array = batch.column(self.field_column_index); + let field_array = field_array.as_primitive::(); + let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len()); + + while remaining_rows >= bucket_num && self.mode == FoldMode::Optimistic { + self.collect_tag_values(&vectors, cursor, &mut tag_values_buf); + if !self.validate_optimistic_group( + &vectors, + le_array, + cursor, + bucket_num, + &tag_values_buf, + ) { + let remaining_input_batch = batch.slice(cursor, remaining_rows); + self.switch_to_safe_mode(remaining_input_batch)?; + return Ok(()); + } - while remaining_rows >= bucket_num { // "sample" normal columns - for normal_index in &self.normal_indices { - let val = vectors[*normal_index].get(cursor); - self.output_buffer[*normal_index].push_value_ref(&val.as_value_ref()); + for (idx, value) in self.normal_indices.iter().zip(tag_values_buf.iter()) { + self.output_buffer[*idx].push_value_ref(value); } // "fold" `le` and field columns - let le_array = batch.column(self.le_column_index); - let le_array = le_array.as_string::(); - let field_array = batch.column(self.field_column_index); - let field_array = field_array.as_primitive::(); - let mut bucket = vec![]; - let mut counters = vec![]; + let mut bucket = Vec::with_capacity(bucket_num); + let mut counters = Vec::with_capacity(bucket_num); for bias in 0..bucket_num { - let le_str = le_array.value(cursor + bias); - let le = le_str.parse::().unwrap(); + let position = cursor + bias; + let le = if le_array.is_valid(position) { + le_array.value(position).parse::().unwrap_or(f64::NAN) + } else { + f64::NAN + }; bucket.push(le); - let counter = field_array.value(cursor + bias); + let counter = if field_array.is_valid(position) { + field_array.value(position) + } else { + f64::NAN + }; counters.push(counter); } // ignore invalid data @@ -593,7 +667,9 @@ impl HistogramFoldStream { let remaining_input_batch = batch.slice(cursor, remaining_rows); self.input_buffered_rows = remaining_input_batch.num_rows(); - self.input_buffer.push(remaining_input_batch); + if self.input_buffered_rows > 0 { + self.input_buffer.push(remaining_input_batch); + } Ok(()) } @@ -603,6 +679,170 @@ impl HistogramFoldStream { self.input_buffer.push(batch); } + fn maybe_take_output(&mut self) -> DataFusionResult>> { + if self.output_buffered_rows >= self.batch_size { + return Ok(self.take_output_buf()?.map(Ok)); + } + Ok(None) + } + + fn switch_to_safe_mode(&mut self, remaining_batch: RecordBatch) -> DataFusionResult<()> { + self.mode = FoldMode::Safe; + self.bucket_size = None; + self.input_buffer.clear(); + self.input_buffered_rows = remaining_batch.num_rows(); + + if self.input_buffered_rows > 0 { + self.input_buffer.push(remaining_batch); + self.process_safe_mode_buffer()?; + } + + Ok(()) + } + + fn collect_tag_values<'a>( + &self, + vectors: &'a [VectorRef], + row: usize, + tag_values: &mut Vec>, + ) { + tag_values.clear(); + for idx in self.normal_indices.iter() { + tag_values.push(vectors[*idx].get_ref(row)); + } + } + + fn validate_optimistic_group( + &self, + vectors: &[VectorRef], + le_array: &StringArray, + cursor: usize, + bucket_num: usize, + tag_values: &[ValueRef<'_>], + ) -> bool { + let inf_index = cursor + bucket_num - 1; + if !Self::is_positive_infinity(le_array, inf_index) { + return false; + } + + for offset in 1..bucket_num { + let row = cursor + offset; + for (idx, expected) in self.normal_indices.iter().zip(tag_values.iter()) { + if vectors[*idx].get_ref(row) != *expected { + return false; + } + } + } + true + } + + /// Checks whether a row belongs to the current group (same series). + fn is_same_group( + &self, + vectors: &[VectorRef], + row: usize, + tag_values: &[ValueRef<'_>], + ) -> bool { + self.normal_indices + .iter() + .zip(tag_values.iter()) + .all(|(idx, expected)| vectors[*idx].get_ref(row) == *expected) + } + + fn push_output_row(&mut self, tag_values: &[ValueRef<'_>], result: f64) { + debug_assert_eq!(self.normal_indices.len(), tag_values.len()); + for (idx, value) in self.normal_indices.iter().zip(tag_values.iter()) { + self.output_buffer[*idx].push_value_ref(value); + } + self.output_buffer[self.field_column_index].push_value_ref(&ValueRef::from(result)); + self.output_buffered_rows += 1; + } + + fn finalize_safe_group(&mut self) -> DataFusionResult<()> { + if let Some(group) = self.safe_group.take() { + if group.tag_values.is_empty() { + return Ok(()); + } + + let has_inf = group + .buckets + .last() + .map(|v| v.is_infinite() && v.is_sign_positive()) + .unwrap_or(false); + let result = if group.buckets.len() < 2 || !has_inf { + f64::NAN + } else { + Self::evaluate_row(self.quantile, &group.buckets, &group.counters) + .unwrap_or(f64::NAN) + }; + let mut tag_value_refs = Vec::with_capacity(group.tag_values.len()); + tag_value_refs.extend(group.tag_values.iter().map(|v| v.as_value_ref())); + self.push_output_row(&tag_value_refs, result); + } + Ok(()) + } + + fn process_safe_mode_buffer(&mut self) -> DataFusionResult<()> { + if self.input_buffer.is_empty() { + self.input_buffered_rows = 0; + return Ok(()); + } + + let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?; + self.input_buffered_rows = 0; + let vectors = Helper::try_into_vectors(batch.columns()) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + let le_array = batch.column(self.le_column_index).as_string::(); + let field_array = batch + .column(self.field_column_index) + .as_primitive::(); + let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len()); + + for row in 0..batch.num_rows() { + self.collect_tag_values(&vectors, row, &mut tag_values_buf); + let should_start_new_group = self + .safe_group + .as_ref() + .is_none_or(|group| !Self::tag_values_equal(&group.tag_values, &tag_values_buf)); + if should_start_new_group { + self.finalize_safe_group()?; + self.safe_group = Some(SafeGroup { + tag_values: tag_values_buf.iter().cloned().map(Value::from).collect(), + buckets: Vec::new(), + counters: Vec::new(), + }); + } + + let Some(group) = self.safe_group.as_mut() else { + continue; + }; + + let bucket = if le_array.is_valid(row) { + le_array.value(row).parse::().unwrap_or(f64::NAN) + } else { + f64::NAN + }; + let counter = if field_array.is_valid(row) { + field_array.value(row) + } else { + f64::NAN + }; + + group.buckets.push(bucket); + group.counters.push(counter); + } + + Ok(()) + } + + fn tag_values_equal(group_values: &[Value], current: &[ValueRef<'_>]) -> bool { + group_values.len() == current.len() + && group_values + .iter() + .zip(current.iter()) + .all(|(group, now)| group.as_value_ref() == *now) + } + /// Compute result from output buffer fn take_output_buf(&mut self) -> DataFusionResult> { if self.output_buffered_rows == 0 { @@ -630,41 +870,31 @@ impl HistogramFoldStream { .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) } - /// Find the first `+Inf` which indicates the end of the bucket group - /// - /// If the return value equals to batch's num_rows means the it's not found - /// in this batch - fn find_positive_inf(&self, batch: &RecordBatch) -> DataFusionResult { - // fuse this function. It should not be called when the - // bucket size is already know. - if let Some(bucket_size) = self.bucket_size { - return Ok(bucket_size); - } - let string_le_array = batch.column(self.le_column_index); - let float_le_array = compute::cast(&string_le_array, &DataType::Float64).map_err(|e| { - DataFusionError::Execution(format!( - "cannot cast {} array to float64 array: {:?}", - string_le_array.data_type(), - e - )) - })?; - let le_as_f64_array = float_le_array - .as_primitive_opt::() - .ok_or_else(|| { - DataFusionError::Execution(format!( - "expect a float64 array, but found {}", - float_le_array.data_type() - )) - })?; - for (i, v) in le_as_f64_array.iter().enumerate() { - if let Some(v) = v - && v == f64::INFINITY - { - return Ok(i); + fn flush_remaining(&mut self) -> DataFusionResult<()> { + if self.mode == FoldMode::Optimistic && self.input_buffered_rows > 0 { + let buffered_batches: Vec<_> = self.input_buffer.drain(..).collect(); + if !buffered_batches.is_empty() { + let batch = concat_batches(&self.input_schema, buffered_batches.as_slice())?; + self.switch_to_safe_mode(batch)?; + } else { + self.input_buffered_rows = 0; } } - Ok(batch.num_rows()) + if self.mode == FoldMode::Safe { + self.process_safe_mode_buffer()?; + self.finalize_safe_group()?; + } + + Ok(()) + } + + fn is_positive_infinity(le_array: &StringArray, index: usize) -> bool { + le_array.is_valid(index) + && matches!( + le_array.value(index).parse::(), + Ok(value) if value.is_infinite() && value.is_sign_positive() + ) } /// Evaluate the field column and return the result @@ -693,8 +923,28 @@ impl HistogramFoldStream { } // check input value - debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}"); - debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}"); + if !bucket.windows(2).all(|w| w[0] <= w[1]) { + return Ok(f64::NAN); + } + let counter = { + let needs_fix = + counter.iter().any(|v| !v.is_finite()) || !counter.windows(2).all(|w| w[0] <= w[1]); + if !needs_fix { + Cow::Borrowed(counter) + } else { + let mut fixed = Vec::with_capacity(counter.len()); + let mut prev = 0.0; + for (idx, &v) in counter.iter().enumerate() { + let mut val = if v.is_finite() { v } else { prev }; + if idx > 0 && val < prev { + val = prev; + } + fixed.push(val); + prev = val; + } + Cow::Owned(fixed) + } + }; let total = *counter.last().unwrap(); let expected_pos = total * quantile; @@ -713,6 +963,9 @@ impl HistogramFoldStream { lower_bound = bucket[fit_bucket_pos - 1]; lower_count = counter[fit_bucket_pos - 1]; } + if (upper_count - lower_count).abs() < 1e-10 { + return Ok(f64::NAN); + } Ok(lower_bound + (upper_bound - lower_bound) / (upper_count - lower_count) * (expected_pos - lower_count)) @@ -724,8 +977,8 @@ impl HistogramFoldStream { mod test { use std::sync::Arc; - use datafusion::arrow::array::Float64Array; - use datafusion::arrow::datatypes::{Field, Schema}; + use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; + use datafusion::arrow::datatypes::{Field, Schema, SchemaRef, TimeUnit}; use datafusion::common::ToDFSchema; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; @@ -792,6 +1045,43 @@ mod test { )) } + fn build_fold_exec_from_batches( + batches: Vec, + schema: SchemaRef, + quantile: f64, + ts_column_index: usize, + ) -> Arc { + let memory_exec = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(), + ))); + let output_schema: SchemaRef = Arc::new( + HistogramFold::convert_schema( + &Arc::new(memory_exec.schema().to_dfschema().unwrap()), + "le", + ) + .unwrap() + .as_arrow() + .clone(), + ); + let properties = PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + + Arc::new(HistogramFoldExec { + le_column_index: 1, + field_column_index: 2, + quantile, + ts_column_index, + input: memory_exec, + output_schema, + metric: ExecutionPlanMetricsSet::new(), + properties, + }) + } + #[tokio::test] async fn fold_overall() { let memory_exec = Arc::new(prepare_test_data()); @@ -863,6 +1153,187 @@ mod test { assert_eq!(actual, expected_output_schema) } + #[tokio::test] + async fn fallback_to_safe_mode_on_missing_inf() { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + let host_column = Arc::new(StringArray::from(vec!["a", "a", "a", "a", "b", "b"])) as _; + let le_column = Arc::new(StringArray::from(vec![ + "0.1", "+Inf", "0.1", "1.0", "0.1", "+Inf", + ])) as _; + let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 3.0, 1.0, 5.0])) as _; + let batch = + RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap(); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+------+-----+ +| host | val | ++------+-----+ +| a | 0.1 | +| a | NaN | +| b | 0.1 | ++------+-----+", + ); + assert_eq!(result_literal, expected); + } + + #[tokio::test] + async fn emit_nan_when_no_inf_present() { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + let host_column = Arc::new(StringArray::from(vec!["c", "c"])) as _; + let le_column = Arc::new(StringArray::from(vec!["0.1", "1.0"])) as _; + let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0])) as _; + let batch = + RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap(); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.9, 0); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+------+-----+ +| host | val | ++------+-----+ +| c | NaN | ++------+-----+", + ); + assert_eq!(result_literal, expected); + } + + #[tokio::test] + async fn safe_mode_handles_misaligned_groups() { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + + let ts_column = Arc::new(TimestampMillisecondArray::from(vec![ + 2900000, 2900000, 2900000, 3000000, 3000000, 3000000, 3000000, 3005000, 3005000, + 3010000, 3010000, 3010000, 3010000, 3010000, + ])) as _; + let le_column = Arc::new(StringArray::from(vec![ + "0.1", "1", "5", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf", + ])) as _; + let val_column = Arc::new(Float64Array::from(vec![ + 0.0, 0.0, 0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0, + ])) as _; + let batch = + RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap(); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + + let mut values = Vec::new(); + for batch in result { + let array = batch.column(1).as_primitive::(); + values.extend(array.iter().map(|v| v.unwrap())); + } + + assert_eq!(values.len(), 4); + assert!(values[0].is_nan()); + assert!((values[1] - 0.55).abs() < 1e-10); + assert!((values[2] - 0.1).abs() < 1e-10); + assert!((values[3] - 2.0).abs() < 1e-10); + } + + #[tokio::test] + async fn missing_buckets_at_first_timestamp() { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + + let ts_column = Arc::new(TimestampMillisecondArray::from(vec![ + 2_900_000, 3_000_000, 3_000_000, 3_000_000, 3_000_000, 3_005_000, 3_005_000, 3_010_000, + 3_010_000, 3_010_000, 3_010_000, 3_010_000, + ])) as _; + let le_column = Arc::new(StringArray::from(vec![ + "0.1", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf", + ])) as _; + let val_column = Arc::new(Float64Array::from(vec![ + 0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0, + ])) as _; + + let batch = + RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap(); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + + let mut values = Vec::new(); + for batch in result { + let array = batch.column(1).as_primitive::(); + values.extend(array.iter().map(|v| v.unwrap())); + } + + assert_eq!(values.len(), 4); + assert!(values[0].is_nan()); + assert!((values[1] - 0.55).abs() < 1e-10); + assert!((values[2] - 0.1).abs() < 1e-10); + assert!((values[3] - 2.0).abs() < 1e-10); + } + + #[tokio::test] + async fn missing_inf_in_first_group() { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + + let ts_column = Arc::new(TimestampMillisecondArray::from(vec![ + 1000, 1000, 1000, 2000, 2000, 2000, 2000, + ])) as _; + let le_column = Arc::new(StringArray::from(vec![ + "0.1", "1", "5", "0.1", "1", "5", "+Inf", + ])) as _; + let val_column = Arc::new(Float64Array::from(vec![ + 0.0, 0.0, 0.0, 10.0, 20.0, 30.0, 30.0, + ])) as _; + let batch = + RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap(); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + + let mut values = Vec::new(); + for batch in result { + let array = batch.column(1).as_primitive::(); + values.extend(array.iter().map(|v| v.unwrap())); + } + + assert_eq!(values.len(), 2); + assert!(values[0].is_nan()); + assert!((values[1] - 0.55).abs() < 1e-10, "{values:?}"); + } + #[test] fn evaluate_row_normal_case() { let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY]; @@ -935,11 +1406,11 @@ mod test { } #[test] - #[should_panic] fn evaluate_out_of_order_input() { let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY]; let counters = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]; - HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap(); + let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap(); + assert_eq!(0.0, result); } #[test] @@ -957,4 +1428,20 @@ mod test { let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap(); assert_eq!(3.0, result); } + + #[test] + fn evaluate_non_monotonic_counter() { + let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY]; + let counters = [0.1, 0.2, 0.4, 0.17, 0.5]; + let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap(); + assert!((result - 1.25).abs() < 1e-10, "{result}"); + } + + #[test] + fn evaluate_nan_counter() { + let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY]; + let counters = [f64::NAN, 1.0, 2.0, 3.0, 3.0]; + let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap(); + assert!((result - 1.5).abs() < 1e-10, "{result}"); + } } diff --git a/tests/cases/standalone/common/promql/simple_histogram.result b/tests/cases/standalone/common/promql/simple_histogram.result index 1409e7834a..60a1d42b68 100644 --- a/tests/cases/standalone/common/promql/simple_histogram.result +++ b/tests/cases/standalone/common/promql/simple_histogram.result @@ -363,3 +363,52 @@ drop table greptime_servers_postgres_query_elapsed_no_le; Affected Rows: 0 +-- test case with some missing buckets +create table histogram5_bucket ( + ts timestamp time index, + le string, + s string, + val double, + primary key (s, le), +); + +Affected Rows: 0 + +insert into histogram5_bucket values + (3000000, "0.1", "a", 0), + -- (3000000, "1", "a", 0), + -- (3000000, "5", "a", 0), + -- (3000000, "+Inf", "a", 0), + (3005000, "0.1", "a", 50), + (3005000, "1", "a", 70), + (3005000, "5", "a", 110), + (3005000, "+Inf", "a", 120), + (3010000, "0.1", "a", 10), + -- (3010000, "1", "a", 20), + -- (3010000, "5", "a", 20), + (3010000, "+Inf", "a", 30), + (3015000, "0.1", "a", 10), + (3015000, "1", "a", 10), + (3015000, "3", "a", 20), -- + (3015000, "5", "a", 30), + (3015000, "+Inf", "a", 50); + +Affected Rows: 12 + +tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket); + ++---------------------+---+--------------------+ +| ts | s | val | ++---------------------+---+--------------------+ +| 1970-01-01T00:50:00 | a | NaN | +| 1970-01-01T00:50:03 | a | NaN | +| 1970-01-01T00:50:06 | a | 0.5499999999999999 | +| 1970-01-01T00:50:09 | a | 0.5499999999999999 | +| 1970-01-01T00:50:12 | a | 0.775 | +| 1970-01-01T00:50:15 | a | 4.0 | ++---------------------+---+--------------------+ + +drop table histogram5_bucket; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/simple_histogram.sql b/tests/cases/standalone/common/promql/simple_histogram.sql index d6dde4cb69..daeae79254 100644 --- a/tests/cases/standalone/common/promql/simple_histogram.sql +++ b/tests/cases/standalone/common/promql/simple_histogram.sql @@ -204,3 +204,36 @@ tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, le) (rate(g tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fbf) (rate(greptime_servers_postgres_query_elapsed_no_le{instance=~"xxx"}[1m]))); drop table greptime_servers_postgres_query_elapsed_no_le; + +-- test case with some missing buckets +create table histogram5_bucket ( + ts timestamp time index, + le string, + s string, + val double, + primary key (s, le), +); + +insert into histogram5_bucket values + (3000000, "0.1", "a", 0), + -- (3000000, "1", "a", 0), + -- (3000000, "5", "a", 0), + -- (3000000, "+Inf", "a", 0), + (3005000, "0.1", "a", 50), + (3005000, "1", "a", 70), + (3005000, "5", "a", 110), + (3005000, "+Inf", "a", 120), + (3010000, "0.1", "a", 10), + -- (3010000, "1", "a", 20), + -- (3010000, "5", "a", 20), + (3010000, "+Inf", "a", 30), + (3015000, "0.1", "a", 10), + (3015000, "1", "a", 10), + (3015000, "3", "a", 20), -- + (3015000, "5", "a", 30), + (3015000, "+Inf", "a", 50); + + +tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket); + +drop table histogram5_bucket;