mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: run histogram quantile in safe mode for incomplete data (#7297)
* initial impl Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * sqlness test and fix Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * correct sqlness case Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * simplification Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * refine code and comment Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -13,14 +13,15 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::warn;
|
||||
use datafusion::arrow::array::AsArray;
|
||||
use datafusion::arrow::compute::{self, SortOptions, concat_batches};
|
||||
use datafusion::arrow::array::{Array, AsArray, StringArray};
|
||||
use datafusion::arrow::compute::{SortOptions, concat_batches};
|
||||
use datafusion::arrow::datatypes::{DataType, Float64Type, SchemaRef};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::common::stats::Precision;
|
||||
@@ -40,8 +41,8 @@ use datafusion::physical_plan::{
|
||||
};
|
||||
use datafusion::prelude::{Column, Expr};
|
||||
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
|
||||
use datatypes::value::{OrderedF64, ValueRef};
|
||||
use datatypes::vectors::{Helper, MutableVector};
|
||||
use datatypes::value::{OrderedF64, Value, ValueRef};
|
||||
use datatypes::vectors::{Helper, MutableVector, VectorRef};
|
||||
use futures::{Stream, StreamExt, ready};
|
||||
|
||||
/// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later
|
||||
@@ -358,6 +359,9 @@ impl ExecutionPlan for HistogramFoldExec {
|
||||
input_buffer: vec![],
|
||||
input,
|
||||
output_schema,
|
||||
input_schema: self.input.schema(),
|
||||
mode: FoldMode::Optimistic,
|
||||
safe_group: None,
|
||||
metric: baseline_metric,
|
||||
batch_size,
|
||||
input_buffered_rows: 0,
|
||||
@@ -430,6 +434,12 @@ impl DisplayAs for HistogramFoldExec {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum FoldMode {
|
||||
Optimistic,
|
||||
Safe,
|
||||
}
|
||||
|
||||
pub struct HistogramFoldStream {
|
||||
// internal states
|
||||
le_column_index: usize,
|
||||
@@ -441,6 +451,9 @@ pub struct HistogramFoldStream {
|
||||
/// Expected output batch size
|
||||
batch_size: usize,
|
||||
output_schema: SchemaRef,
|
||||
input_schema: SchemaRef,
|
||||
mode: FoldMode,
|
||||
safe_group: Option<SafeGroup>,
|
||||
|
||||
// buffers
|
||||
input_buffer: Vec<RecordBatch>,
|
||||
@@ -453,6 +466,13 @@ pub struct HistogramFoldStream {
|
||||
metric: BaselineMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct SafeGroup {
|
||||
tag_values: Vec<Value>,
|
||||
buckets: Vec<f64>,
|
||||
counters: Vec<f64>,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for HistogramFoldStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
@@ -478,7 +498,10 @@ impl Stream for HistogramFoldStream {
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
break Poll::Ready(Some(result));
|
||||
}
|
||||
None => break Poll::Ready(self.take_output_buf()?.map(Ok)),
|
||||
None => {
|
||||
self.flush_remaining()?;
|
||||
break Poll::Ready(self.take_output_buf()?.map(Ok));
|
||||
}
|
||||
}
|
||||
};
|
||||
self.metric.record_poll(poll)
|
||||
@@ -491,22 +514,28 @@ impl HistogramFoldStream {
|
||||
&mut self,
|
||||
input: RecordBatch,
|
||||
) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
|
||||
let Some(bucket_num) = self.calculate_bucket_num(&input)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
match self.mode {
|
||||
FoldMode::Safe => {
|
||||
self.push_input_buf(input);
|
||||
self.process_safe_mode_buffer()?;
|
||||
}
|
||||
FoldMode::Optimistic => {
|
||||
self.push_input_buf(input);
|
||||
let Some(bucket_num) = self.calculate_bucket_num_from_buffer()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
self.bucket_size = Some(bucket_num);
|
||||
|
||||
if self.input_buffered_rows + input.num_rows() < bucket_num {
|
||||
// not enough rows to fold
|
||||
self.push_input_buf(input);
|
||||
return Ok(None);
|
||||
if self.input_buffered_rows < bucket_num {
|
||||
// not enough rows to fold
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.fold_buf(bucket_num)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.fold_buf(bucket_num, input)?;
|
||||
if self.output_buffered_rows >= self.batch_size {
|
||||
return Ok(self.take_output_buf()?.map(Ok));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
self.maybe_take_output()
|
||||
}
|
||||
|
||||
/// Generate a group of empty [MutableVector]s from the output schema.
|
||||
@@ -532,55 +561,100 @@ impl HistogramFoldStream {
|
||||
Ok(builders)
|
||||
}
|
||||
|
||||
fn calculate_bucket_num(&mut self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
|
||||
/// Determines bucket count using buffered batches, concatenating them to
|
||||
/// detect the first complete bucket that may span batch boundaries.
|
||||
fn calculate_bucket_num_from_buffer(&mut self) -> DataFusionResult<Option<usize>> {
|
||||
if let Some(size) = self.bucket_size {
|
||||
return Ok(Some(size));
|
||||
}
|
||||
|
||||
let inf_pos = self.find_positive_inf(batch)?;
|
||||
if inf_pos == batch.num_rows() {
|
||||
// no positive inf found, append to buffer and wait for next batch
|
||||
self.push_input_buf(batch.clone());
|
||||
if self.input_buffer.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// else we found the positive inf.
|
||||
// calculate the bucket size
|
||||
let bucket_size = inf_pos + self.input_buffered_rows + 1;
|
||||
Ok(Some(bucket_size))
|
||||
let batch_refs: Vec<&RecordBatch> = self.input_buffer.iter().collect();
|
||||
let batch = concat_batches(&self.input_schema, batch_refs)?;
|
||||
self.find_first_complete_bucket(&batch)
|
||||
}
|
||||
|
||||
fn find_first_complete_bucket(&self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
|
||||
if batch.num_rows() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let vectors = Helper::try_into_vectors(batch.columns())
|
||||
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
|
||||
let le_array = batch.column(self.le_column_index).as_string::<i32>();
|
||||
|
||||
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
|
||||
self.collect_tag_values(&vectors, 0, &mut tag_values_buf);
|
||||
let mut group_start = 0usize;
|
||||
|
||||
for row in 0..batch.num_rows() {
|
||||
if !self.is_same_group(&vectors, row, &tag_values_buf) {
|
||||
// new group begins
|
||||
self.collect_tag_values(&vectors, row, &mut tag_values_buf);
|
||||
group_start = row;
|
||||
}
|
||||
|
||||
if Self::is_positive_infinity(le_array, row) {
|
||||
return Ok(Some(row - group_start + 1));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Fold record batches from input buffer and put to output buffer
|
||||
fn fold_buf(&mut self, bucket_num: usize, input: RecordBatch) -> DataFusionResult<()> {
|
||||
self.push_input_buf(input);
|
||||
// TODO(ruihang): this concat is avoidable.
|
||||
let batch = concat_batches(&self.input.schema(), self.input_buffer.drain(..).as_ref())?;
|
||||
fn fold_buf(&mut self, bucket_num: usize) -> DataFusionResult<()> {
|
||||
let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?;
|
||||
let mut remaining_rows = self.input_buffered_rows;
|
||||
let mut cursor = 0;
|
||||
|
||||
// TODO(LFC): Try to get rid of the Arrow array to vector conversion here.
|
||||
let vectors = Helper::try_into_vectors(batch.columns())
|
||||
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
|
||||
let le_array = batch.column(self.le_column_index);
|
||||
let le_array = le_array.as_string::<i32>();
|
||||
let field_array = batch.column(self.field_column_index);
|
||||
let field_array = field_array.as_primitive::<Float64Type>();
|
||||
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
|
||||
|
||||
while remaining_rows >= bucket_num && self.mode == FoldMode::Optimistic {
|
||||
self.collect_tag_values(&vectors, cursor, &mut tag_values_buf);
|
||||
if !self.validate_optimistic_group(
|
||||
&vectors,
|
||||
le_array,
|
||||
cursor,
|
||||
bucket_num,
|
||||
&tag_values_buf,
|
||||
) {
|
||||
let remaining_input_batch = batch.slice(cursor, remaining_rows);
|
||||
self.switch_to_safe_mode(remaining_input_batch)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
while remaining_rows >= bucket_num {
|
||||
// "sample" normal columns
|
||||
for normal_index in &self.normal_indices {
|
||||
let val = vectors[*normal_index].get(cursor);
|
||||
self.output_buffer[*normal_index].push_value_ref(&val.as_value_ref());
|
||||
for (idx, value) in self.normal_indices.iter().zip(tag_values_buf.iter()) {
|
||||
self.output_buffer[*idx].push_value_ref(value);
|
||||
}
|
||||
// "fold" `le` and field columns
|
||||
let le_array = batch.column(self.le_column_index);
|
||||
let le_array = le_array.as_string::<i32>();
|
||||
let field_array = batch.column(self.field_column_index);
|
||||
let field_array = field_array.as_primitive::<Float64Type>();
|
||||
let mut bucket = vec![];
|
||||
let mut counters = vec![];
|
||||
let mut bucket = Vec::with_capacity(bucket_num);
|
||||
let mut counters = Vec::with_capacity(bucket_num);
|
||||
for bias in 0..bucket_num {
|
||||
let le_str = le_array.value(cursor + bias);
|
||||
let le = le_str.parse::<f64>().unwrap();
|
||||
let position = cursor + bias;
|
||||
let le = if le_array.is_valid(position) {
|
||||
le_array.value(position).parse::<f64>().unwrap_or(f64::NAN)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
bucket.push(le);
|
||||
|
||||
let counter = field_array.value(cursor + bias);
|
||||
let counter = if field_array.is_valid(position) {
|
||||
field_array.value(position)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
counters.push(counter);
|
||||
}
|
||||
// ignore invalid data
|
||||
@@ -593,7 +667,9 @@ impl HistogramFoldStream {
|
||||
|
||||
let remaining_input_batch = batch.slice(cursor, remaining_rows);
|
||||
self.input_buffered_rows = remaining_input_batch.num_rows();
|
||||
self.input_buffer.push(remaining_input_batch);
|
||||
if self.input_buffered_rows > 0 {
|
||||
self.input_buffer.push(remaining_input_batch);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -603,6 +679,170 @@ impl HistogramFoldStream {
|
||||
self.input_buffer.push(batch);
|
||||
}
|
||||
|
||||
fn maybe_take_output(&mut self) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
|
||||
if self.output_buffered_rows >= self.batch_size {
|
||||
return Ok(self.take_output_buf()?.map(Ok));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn switch_to_safe_mode(&mut self, remaining_batch: RecordBatch) -> DataFusionResult<()> {
|
||||
self.mode = FoldMode::Safe;
|
||||
self.bucket_size = None;
|
||||
self.input_buffer.clear();
|
||||
self.input_buffered_rows = remaining_batch.num_rows();
|
||||
|
||||
if self.input_buffered_rows > 0 {
|
||||
self.input_buffer.push(remaining_batch);
|
||||
self.process_safe_mode_buffer()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn collect_tag_values<'a>(
|
||||
&self,
|
||||
vectors: &'a [VectorRef],
|
||||
row: usize,
|
||||
tag_values: &mut Vec<ValueRef<'a>>,
|
||||
) {
|
||||
tag_values.clear();
|
||||
for idx in self.normal_indices.iter() {
|
||||
tag_values.push(vectors[*idx].get_ref(row));
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_optimistic_group(
|
||||
&self,
|
||||
vectors: &[VectorRef],
|
||||
le_array: &StringArray,
|
||||
cursor: usize,
|
||||
bucket_num: usize,
|
||||
tag_values: &[ValueRef<'_>],
|
||||
) -> bool {
|
||||
let inf_index = cursor + bucket_num - 1;
|
||||
if !Self::is_positive_infinity(le_array, inf_index) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for offset in 1..bucket_num {
|
||||
let row = cursor + offset;
|
||||
for (idx, expected) in self.normal_indices.iter().zip(tag_values.iter()) {
|
||||
if vectors[*idx].get_ref(row) != *expected {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
/// Checks whether a row belongs to the current group (same series).
|
||||
fn is_same_group(
|
||||
&self,
|
||||
vectors: &[VectorRef],
|
||||
row: usize,
|
||||
tag_values: &[ValueRef<'_>],
|
||||
) -> bool {
|
||||
self.normal_indices
|
||||
.iter()
|
||||
.zip(tag_values.iter())
|
||||
.all(|(idx, expected)| vectors[*idx].get_ref(row) == *expected)
|
||||
}
|
||||
|
||||
fn push_output_row(&mut self, tag_values: &[ValueRef<'_>], result: f64) {
|
||||
debug_assert_eq!(self.normal_indices.len(), tag_values.len());
|
||||
for (idx, value) in self.normal_indices.iter().zip(tag_values.iter()) {
|
||||
self.output_buffer[*idx].push_value_ref(value);
|
||||
}
|
||||
self.output_buffer[self.field_column_index].push_value_ref(&ValueRef::from(result));
|
||||
self.output_buffered_rows += 1;
|
||||
}
|
||||
|
||||
fn finalize_safe_group(&mut self) -> DataFusionResult<()> {
|
||||
if let Some(group) = self.safe_group.take() {
|
||||
if group.tag_values.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let has_inf = group
|
||||
.buckets
|
||||
.last()
|
||||
.map(|v| v.is_infinite() && v.is_sign_positive())
|
||||
.unwrap_or(false);
|
||||
let result = if group.buckets.len() < 2 || !has_inf {
|
||||
f64::NAN
|
||||
} else {
|
||||
Self::evaluate_row(self.quantile, &group.buckets, &group.counters)
|
||||
.unwrap_or(f64::NAN)
|
||||
};
|
||||
let mut tag_value_refs = Vec::with_capacity(group.tag_values.len());
|
||||
tag_value_refs.extend(group.tag_values.iter().map(|v| v.as_value_ref()));
|
||||
self.push_output_row(&tag_value_refs, result);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_safe_mode_buffer(&mut self) -> DataFusionResult<()> {
|
||||
if self.input_buffer.is_empty() {
|
||||
self.input_buffered_rows = 0;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?;
|
||||
self.input_buffered_rows = 0;
|
||||
let vectors = Helper::try_into_vectors(batch.columns())
|
||||
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
|
||||
let le_array = batch.column(self.le_column_index).as_string::<i32>();
|
||||
let field_array = batch
|
||||
.column(self.field_column_index)
|
||||
.as_primitive::<Float64Type>();
|
||||
let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len());
|
||||
|
||||
for row in 0..batch.num_rows() {
|
||||
self.collect_tag_values(&vectors, row, &mut tag_values_buf);
|
||||
let should_start_new_group = self
|
||||
.safe_group
|
||||
.as_ref()
|
||||
.is_none_or(|group| !Self::tag_values_equal(&group.tag_values, &tag_values_buf));
|
||||
if should_start_new_group {
|
||||
self.finalize_safe_group()?;
|
||||
self.safe_group = Some(SafeGroup {
|
||||
tag_values: tag_values_buf.iter().cloned().map(Value::from).collect(),
|
||||
buckets: Vec::new(),
|
||||
counters: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
let Some(group) = self.safe_group.as_mut() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let bucket = if le_array.is_valid(row) {
|
||||
le_array.value(row).parse::<f64>().unwrap_or(f64::NAN)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
let counter = if field_array.is_valid(row) {
|
||||
field_array.value(row)
|
||||
} else {
|
||||
f64::NAN
|
||||
};
|
||||
|
||||
group.buckets.push(bucket);
|
||||
group.counters.push(counter);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn tag_values_equal(group_values: &[Value], current: &[ValueRef<'_>]) -> bool {
|
||||
group_values.len() == current.len()
|
||||
&& group_values
|
||||
.iter()
|
||||
.zip(current.iter())
|
||||
.all(|(group, now)| group.as_value_ref() == *now)
|
||||
}
|
||||
|
||||
/// Compute result from output buffer
|
||||
fn take_output_buf(&mut self) -> DataFusionResult<Option<RecordBatch>> {
|
||||
if self.output_buffered_rows == 0 {
|
||||
@@ -630,41 +870,31 @@ impl HistogramFoldStream {
|
||||
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
|
||||
}
|
||||
|
||||
/// Find the first `+Inf` which indicates the end of the bucket group
|
||||
///
|
||||
/// If the return value equals to batch's num_rows means the it's not found
|
||||
/// in this batch
|
||||
fn find_positive_inf(&self, batch: &RecordBatch) -> DataFusionResult<usize> {
|
||||
// fuse this function. It should not be called when the
|
||||
// bucket size is already know.
|
||||
if let Some(bucket_size) = self.bucket_size {
|
||||
return Ok(bucket_size);
|
||||
}
|
||||
let string_le_array = batch.column(self.le_column_index);
|
||||
let float_le_array = compute::cast(&string_le_array, &DataType::Float64).map_err(|e| {
|
||||
DataFusionError::Execution(format!(
|
||||
"cannot cast {} array to float64 array: {:?}",
|
||||
string_le_array.data_type(),
|
||||
e
|
||||
))
|
||||
})?;
|
||||
let le_as_f64_array = float_le_array
|
||||
.as_primitive_opt::<Float64Type>()
|
||||
.ok_or_else(|| {
|
||||
DataFusionError::Execution(format!(
|
||||
"expect a float64 array, but found {}",
|
||||
float_le_array.data_type()
|
||||
))
|
||||
})?;
|
||||
for (i, v) in le_as_f64_array.iter().enumerate() {
|
||||
if let Some(v) = v
|
||||
&& v == f64::INFINITY
|
||||
{
|
||||
return Ok(i);
|
||||
fn flush_remaining(&mut self) -> DataFusionResult<()> {
|
||||
if self.mode == FoldMode::Optimistic && self.input_buffered_rows > 0 {
|
||||
let buffered_batches: Vec<_> = self.input_buffer.drain(..).collect();
|
||||
if !buffered_batches.is_empty() {
|
||||
let batch = concat_batches(&self.input_schema, buffered_batches.as_slice())?;
|
||||
self.switch_to_safe_mode(batch)?;
|
||||
} else {
|
||||
self.input_buffered_rows = 0;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(batch.num_rows())
|
||||
if self.mode == FoldMode::Safe {
|
||||
self.process_safe_mode_buffer()?;
|
||||
self.finalize_safe_group()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_positive_infinity(le_array: &StringArray, index: usize) -> bool {
|
||||
le_array.is_valid(index)
|
||||
&& matches!(
|
||||
le_array.value(index).parse::<f64>(),
|
||||
Ok(value) if value.is_infinite() && value.is_sign_positive()
|
||||
)
|
||||
}
|
||||
|
||||
/// Evaluate the field column and return the result
|
||||
@@ -693,8 +923,28 @@ impl HistogramFoldStream {
|
||||
}
|
||||
|
||||
// check input value
|
||||
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
|
||||
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
|
||||
if !bucket.windows(2).all(|w| w[0] <= w[1]) {
|
||||
return Ok(f64::NAN);
|
||||
}
|
||||
let counter = {
|
||||
let needs_fix =
|
||||
counter.iter().any(|v| !v.is_finite()) || !counter.windows(2).all(|w| w[0] <= w[1]);
|
||||
if !needs_fix {
|
||||
Cow::Borrowed(counter)
|
||||
} else {
|
||||
let mut fixed = Vec::with_capacity(counter.len());
|
||||
let mut prev = 0.0;
|
||||
for (idx, &v) in counter.iter().enumerate() {
|
||||
let mut val = if v.is_finite() { v } else { prev };
|
||||
if idx > 0 && val < prev {
|
||||
val = prev;
|
||||
}
|
||||
fixed.push(val);
|
||||
prev = val;
|
||||
}
|
||||
Cow::Owned(fixed)
|
||||
}
|
||||
};
|
||||
|
||||
let total = *counter.last().unwrap();
|
||||
let expected_pos = total * quantile;
|
||||
@@ -713,6 +963,9 @@ impl HistogramFoldStream {
|
||||
lower_bound = bucket[fit_bucket_pos - 1];
|
||||
lower_count = counter[fit_bucket_pos - 1];
|
||||
}
|
||||
if (upper_count - lower_count).abs() < 1e-10 {
|
||||
return Ok(f64::NAN);
|
||||
}
|
||||
Ok(lower_bound
|
||||
+ (upper_bound - lower_bound) / (upper_count - lower_count)
|
||||
* (expected_pos - lower_count))
|
||||
@@ -724,8 +977,8 @@ impl HistogramFoldStream {
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::Float64Array;
|
||||
use datafusion::arrow::datatypes::{Field, Schema};
|
||||
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
use datafusion::arrow::datatypes::{Field, Schema, SchemaRef, TimeUnit};
|
||||
use datafusion::common::ToDFSchema;
|
||||
use datafusion::datasource::memory::MemorySourceConfig;
|
||||
use datafusion::datasource::source::DataSourceExec;
|
||||
@@ -792,6 +1045,43 @@ mod test {
|
||||
))
|
||||
}
|
||||
|
||||
fn build_fold_exec_from_batches(
|
||||
batches: Vec<RecordBatch>,
|
||||
schema: SchemaRef,
|
||||
quantile: f64,
|
||||
ts_column_index: usize,
|
||||
) -> Arc<HistogramFoldExec> {
|
||||
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
|
||||
MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(),
|
||||
)));
|
||||
let output_schema: SchemaRef = Arc::new(
|
||||
HistogramFold::convert_schema(
|
||||
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
|
||||
"le",
|
||||
)
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.clone(),
|
||||
);
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
|
||||
Arc::new(HistogramFoldExec {
|
||||
le_column_index: 1,
|
||||
field_column_index: 2,
|
||||
quantile,
|
||||
ts_column_index,
|
||||
input: memory_exec,
|
||||
output_schema,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
properties,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fold_overall() {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
@@ -863,6 +1153,187 @@ mod test {
|
||||
assert_eq!(actual, expected_output_schema)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fallback_to_safe_mode_on_missing_inf() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
let host_column = Arc::new(StringArray::from(vec!["a", "a", "a", "a", "b", "b"])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "+Inf", "0.1", "1.0", "0.1", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 3.0, 1.0, 5.0])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let expected = String::from(
|
||||
"+------+-----+
|
||||
| host | val |
|
||||
+------+-----+
|
||||
| a | 0.1 |
|
||||
| a | NaN |
|
||||
| b | 0.1 |
|
||||
+------+-----+",
|
||||
);
|
||||
assert_eq!(result_literal, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn emit_nan_when_no_inf_present() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
let host_column = Arc::new(StringArray::from(vec!["c", "c"])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec!["0.1", "1.0"])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.9, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let expected = String::from(
|
||||
"+------+-----+
|
||||
| host | val |
|
||||
+------+-----+
|
||||
| c | NaN |
|
||||
+------+-----+",
|
||||
);
|
||||
assert_eq!(result_literal, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn safe_mode_handles_misaligned_groups() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
|
||||
2900000, 2900000, 2900000, 3000000, 3000000, 3000000, 3000000, 3005000, 3005000,
|
||||
3010000, 3010000, 3010000, 3010000, 3010000,
|
||||
])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "1", "5", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![
|
||||
0.0, 0.0, 0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0,
|
||||
])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut values = Vec::new();
|
||||
for batch in result {
|
||||
let array = batch.column(1).as_primitive::<Float64Type>();
|
||||
values.extend(array.iter().map(|v| v.unwrap()));
|
||||
}
|
||||
|
||||
assert_eq!(values.len(), 4);
|
||||
assert!(values[0].is_nan());
|
||||
assert!((values[1] - 0.55).abs() < 1e-10);
|
||||
assert!((values[2] - 0.1).abs() < 1e-10);
|
||||
assert!((values[3] - 2.0).abs() < 1e-10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_buckets_at_first_timestamp() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
|
||||
2_900_000, 3_000_000, 3_000_000, 3_000_000, 3_000_000, 3_005_000, 3_005_000, 3_010_000,
|
||||
3_010_000, 3_010_000, 3_010_000, 3_010_000,
|
||||
])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "0.1", "1", "5", "+Inf", "0.1", "+Inf", "0.1", "1", "3", "5", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![
|
||||
0.0, 50.0, 70.0, 110.0, 120.0, 10.0, 30.0, 10.0, 20.0, 30.0, 40.0, 50.0,
|
||||
])) as _;
|
||||
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut values = Vec::new();
|
||||
for batch in result {
|
||||
let array = batch.column(1).as_primitive::<Float64Type>();
|
||||
values.extend(array.iter().map(|v| v.unwrap()));
|
||||
}
|
||||
|
||||
assert_eq!(values.len(), 4);
|
||||
assert!(values[0].is_nan());
|
||||
assert!((values[1] - 0.55).abs() < 1e-10);
|
||||
assert!((values[2] - 0.1).abs() < 1e-10);
|
||||
assert!((values[3] - 2.0).abs() < 1e-10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_inf_in_first_group() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let ts_column = Arc::new(TimestampMillisecondArray::from(vec![
|
||||
1000, 1000, 1000, 2000, 2000, 2000, 2000,
|
||||
])) as _;
|
||||
let le_column = Arc::new(StringArray::from(vec![
|
||||
"0.1", "1", "5", "0.1", "1", "5", "+Inf",
|
||||
])) as _;
|
||||
let val_column = Arc::new(Float64Array::from(vec![
|
||||
0.0, 0.0, 0.0, 10.0, 20.0, 30.0, 30.0,
|
||||
])) as _;
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap();
|
||||
let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0);
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut values = Vec::new();
|
||||
for batch in result {
|
||||
let array = batch.column(1).as_primitive::<Float64Type>();
|
||||
values.extend(array.iter().map(|v| v.unwrap()));
|
||||
}
|
||||
|
||||
assert_eq!(values.len(), 2);
|
||||
assert!(values[0].is_nan());
|
||||
assert!((values[1] - 0.55).abs() < 1e-10, "{values:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_row_normal_case() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY];
|
||||
@@ -935,11 +1406,11 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn evaluate_out_of_order_input() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY];
|
||||
let counters = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0];
|
||||
HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert_eq!(0.0, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -957,4 +1428,20 @@ mod test {
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert_eq!(3.0, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_non_monotonic_counter() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY];
|
||||
let counters = [0.1, 0.2, 0.4, 0.17, 0.5];
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert!((result - 1.25).abs() < 1e-10, "{result}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_nan_counter() {
|
||||
let bucket = [0.0, 1.0, 2.0, 3.0, f64::INFINITY];
|
||||
let counters = [f64::NAN, 1.0, 2.0, 3.0, 3.0];
|
||||
let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap();
|
||||
assert!((result - 1.5).abs() < 1e-10, "{result}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,3 +363,52 @@ drop table greptime_servers_postgres_query_elapsed_no_le;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- test case with some missing buckets
|
||||
create table histogram5_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
s string,
|
||||
val double,
|
||||
primary key (s, le),
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into histogram5_bucket values
|
||||
(3000000, "0.1", "a", 0),
|
||||
-- (3000000, "1", "a", 0),
|
||||
-- (3000000, "5", "a", 0),
|
||||
-- (3000000, "+Inf", "a", 0),
|
||||
(3005000, "0.1", "a", 50),
|
||||
(3005000, "1", "a", 70),
|
||||
(3005000, "5", "a", 110),
|
||||
(3005000, "+Inf", "a", 120),
|
||||
(3010000, "0.1", "a", 10),
|
||||
-- (3010000, "1", "a", 20),
|
||||
-- (3010000, "5", "a", 20),
|
||||
(3010000, "+Inf", "a", 30),
|
||||
(3015000, "0.1", "a", 10),
|
||||
(3015000, "1", "a", 10),
|
||||
(3015000, "3", "a", 20), --
|
||||
(3015000, "5", "a", 30),
|
||||
(3015000, "+Inf", "a", 50);
|
||||
|
||||
Affected Rows: 12
|
||||
|
||||
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
|
||||
|
||||
+---------------------+---+--------------------+
|
||||
| ts | s | val |
|
||||
+---------------------+---+--------------------+
|
||||
| 1970-01-01T00:50:00 | a | NaN |
|
||||
| 1970-01-01T00:50:03 | a | NaN |
|
||||
| 1970-01-01T00:50:06 | a | 0.5499999999999999 |
|
||||
| 1970-01-01T00:50:09 | a | 0.5499999999999999 |
|
||||
| 1970-01-01T00:50:12 | a | 0.775 |
|
||||
| 1970-01-01T00:50:15 | a | 4.0 |
|
||||
+---------------------+---+--------------------+
|
||||
|
||||
drop table histogram5_bucket;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -204,3 +204,36 @@ tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, le) (rate(g
|
||||
tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fbf) (rate(greptime_servers_postgres_query_elapsed_no_le{instance=~"xxx"}[1m])));
|
||||
|
||||
drop table greptime_servers_postgres_query_elapsed_no_le;
|
||||
|
||||
-- test case with some missing buckets
|
||||
create table histogram5_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
s string,
|
||||
val double,
|
||||
primary key (s, le),
|
||||
);
|
||||
|
||||
insert into histogram5_bucket values
|
||||
(3000000, "0.1", "a", 0),
|
||||
-- (3000000, "1", "a", 0),
|
||||
-- (3000000, "5", "a", 0),
|
||||
-- (3000000, "+Inf", "a", 0),
|
||||
(3005000, "0.1", "a", 50),
|
||||
(3005000, "1", "a", 70),
|
||||
(3005000, "5", "a", 110),
|
||||
(3005000, "+Inf", "a", 120),
|
||||
(3010000, "0.1", "a", 10),
|
||||
-- (3010000, "1", "a", 20),
|
||||
-- (3010000, "5", "a", 20),
|
||||
(3010000, "+Inf", "a", 30),
|
||||
(3015000, "0.1", "a", 10),
|
||||
(3015000, "1", "a", 10),
|
||||
(3015000, "3", "a", 20), --
|
||||
(3015000, "5", "a", 30),
|
||||
(3015000, "+Inf", "a", 50);
|
||||
|
||||
|
||||
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
|
||||
|
||||
drop table histogram5_bucket;
|
||||
|
||||
Reference in New Issue
Block a user