From 44107f3f33f59e17115acc35c10c7835cb8f8e58 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 21 Mar 2026 15:36:12 +0800 Subject: [PATCH] init impl Signed-off-by: Ruihang Xia --- .../src/extension_plan/scalar_calculate.rs | 35 +++++--- src/promql/src/functions.rs | 20 ++++- .../functions/double_exponential_smoothing.rs | 45 +++++----- src/promql/src/functions/extrapolate_rate.rs | 10 +-- src/promql/src/functions/idelta.rs | 71 ++++++++------- src/promql/src/functions/predict_linear.rs | 89 ++++++++++--------- src/promql/src/functions/quantile.rs | 61 +++++++------ src/promql/src/range_array.rs | 20 +++-- 8 files changed, 201 insertions(+), 150 deletions(-) diff --git a/src/promql/src/extension_plan/scalar_calculate.rs b/src/promql/src/extension_plan/scalar_calculate.rs index 8aaaa46144..54179c6951 100644 --- a/src/promql/src/extension_plan/scalar_calculate.rs +++ b/src/promql/src/extension_plan/scalar_calculate.rs @@ -459,7 +459,7 @@ impl ExecutionPlan for ScalarCalculateExec { input, have_multi_series: false, done: false, - batch: None, + batches: Vec::new(), tag_value: None, })) } @@ -518,7 +518,7 @@ struct ScalarCalculateStream { project_index: (usize, usize), have_multi_series: bool, done: bool, - batch: Option, + batches: Vec, tag_value: Option>, } @@ -577,17 +577,18 @@ impl ScalarCalculateStream { fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> { let ts_column = input_batch.column(self.project_index.0).clone(); - let val_column = cast_with_options( - input_batch.column(self.project_index.1), - &DataType::Float64, - &CastOptions::default(), - )?; + let val_column = + if input_batch.column(self.project_index.1).data_type() == &DataType::Float64 { + input_batch.column(self.project_index.1).clone() + } else { + cast_with_options( + input_batch.column(self.project_index.1), + &DataType::Float64, + &CastOptions::default(), + )? + }; let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?; - if let Some(batch) = &self.batch { - self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?); - } else { - self.batch = Some(input_batch); - } + self.batches.push(input_batch); Ok(()) } } @@ -609,8 +610,14 @@ impl Stream for ScalarCalculateStream { // inner is done, producing output None => { self.done = true; - return match self.batch.take() { - Some(batch) if !self.have_multi_series => { + return match (!self.have_multi_series).then(|| self.batches.split_off(0)) { + Some(mut batches) if !batches.is_empty() => { + let batch = if batches.len() == 1 { + batches.pop().unwrap() + } else { + let refs = batches.iter().collect::>(); + concat_batches(&self.schema, refs)? + }; self.metric.record_output(batch.num_rows()); Poll::Ready(Some(Ok(batch))) } diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index e392d7dcf5..65da884920 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -77,6 +77,16 @@ pub(crate) fn linear_regression( times: &TimestampMillisecondArray, values: &Float64Array, intercept_time: i64, +) -> (Option, Option) { + linear_regression_slice(times.values(), values, 0, values.len(), intercept_time) +} + +pub(crate) fn linear_regression_slice( + times: &[i64], + values: &Float64Array, + offset: usize, + len: usize, + intercept_time: i64, ) -> (Option, Option) { let mut count: f64 = 0.0; let mut sum_x: f64 = 0.0; @@ -89,15 +99,16 @@ pub(crate) fn linear_regression( let mut comp_x2: f64 = 0.0; let mut const_y = true; - let init_y: f64 = values.value(0); + let mut init_y = None; - for (i, value) in values.iter().enumerate() { - let time = times.value(i) as f64; + for (i, value) in values.iter().skip(offset).take(len).enumerate() { + let time = times[offset + i] as f64; if value.is_none() { continue; } let value = value.unwrap(); - if const_y && i > 0 && value != init_y { + let initial = init_y.get_or_insert(value); + if const_y && count > 0.0 && value != *initial { const_y = false; } count += 1.0; @@ -113,6 +124,7 @@ pub(crate) fn linear_regression( } if const_y { + let init_y = init_y.unwrap(); if !init_y.is_finite() { return (None, None); } diff --git a/src/promql/src/functions/double_exponential_smoothing.rs b/src/promql/src/functions/double_exponential_smoothing.rs index b6768d47a1..7e3ce81cc7 100644 --- a/src/promql/src/functions/double_exponential_smoothing.rs +++ b/src/promql/src/functions/double_exponential_smoothing.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use datafusion::arrow::array::Float64Array; +use datafusion::arrow::array::{Float64Array, Float64Builder}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; @@ -177,38 +177,43 @@ impl DoubleExponentialSmoothing { )), )?; - // calculation - let mut result_array = Vec::with_capacity(ts_range.len()); + let all_values = value_range + .values() + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let mut result_builder = Float64Builder::with_capacity(ts_range.len()); let sf_iter = FactorIterator::new(sf_col, num_rows); let tf_iter = FactorIterator::new(tf_col, num_rows); - let iter = (0..num_rows) - .map(|i| (ts_range.get(i), value_range.get(i))) - .zip(sf_iter.zip(tf_iter)); + let iter = (0..num_rows).zip(sf_iter.zip(tf_iter)); - for ((timestamps, values), (sf, tf)) in iter { - let timestamps = timestamps.unwrap(); - let values = values.unwrap(); - let values = values - .as_any() - .downcast_ref::() - .unwrap() - .values(); + for (index, (sf, tf)) in iter { + let (_, ts_len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + ts_len == value_len, DataFusionError::Execution(format!( "{}: input arrays should have the same length, found {} and {}", Self::name(), - timestamps.len(), - values.len() + ts_len, + value_len )), )?; - result_array.push(double_exponential_smoothing_impl(values, sf, tf)); + match double_exponential_smoothing_impl( + &all_values[value_offset..value_offset + value_len], + sf, + tf, + ) { + Some(value) => result_builder.append_value(value), + None => result_builder.append_null(), + } } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } } @@ -240,8 +245,6 @@ fn double_exponential_smoothing_impl(values: &[f64], sf: f64, tf: f64) -> Option return Some(f64::NAN); } - let values = values.to_vec(); - let mut s0 = 0.0; let mut s1 = values[0]; let mut b = values[1] - values[0]; diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 126a506bc9..76f48536ec 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -32,7 +32,7 @@ use std::fmt::Display; use std::sync::Arc; -use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::{DataFusionError, Result as DfResult}; use datafusion::logical_expr::{ScalarUDF, Volatility}; @@ -121,7 +121,7 @@ impl ExtrapolatedRate ExtrapolatedRate ExtrapolatedRate IDelta { )), )?; - // calculation - let mut result_array = Vec::with_capacity(ts_range.len()); + let ts_values = ts_range.values(); + let ts_values = ts_values + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + let value_values = value_range.values(); + let value_values = value_values + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + let mut result_builder = Float64Builder::with_capacity(ts_range.len()); for index in 0..ts_range.len() { - let timestamps = ts_range.get(index).unwrap(); - let timestamps = timestamps - .as_any() - .downcast_ref::() - .unwrap() - .values(); - - let values = value_range.get(index).unwrap(); - let values = values - .as_any() - .downcast_ref::() - .unwrap() - .values(); + let Some((ts_offset, len)) = ts_range.get_offset_length(index) else { + result_builder.append_null(); + continue; + }; + let Some((value_offset, value_len)) = value_range.get_offset_length(index) else { + result_builder.append_null(); + continue; + }; error::ensure( - timestamps.len() == values.len(), + len == value_len, DataFusionError::Execution(format!( "{}: input arrays should have the same length, found {} and {}", Self::name(), - timestamps.len(), - values.len() + len, + value_len )), )?; - - let len = timestamps.len(); if len < 2 { - result_array.push(None); + result_builder.append_null(); continue; } - // if is delta + let last_offset = ts_offset + len - 1; + let prev_offset = last_offset - 1; + let sampled_interval = + (ts_values[last_offset] - ts_values[prev_offset]) as f64 / 1000.0; + + let last_value_offset = value_offset + len - 1; + let prev_value_offset = last_value_offset - 1; + let last_value = value_values[last_value_offset]; + let prev_value = value_values[prev_value_offset]; + if !IS_RATE { - result_array.push(Some(values[len - 1] - values[len - 2])); + result_builder.append_value(last_value - prev_value); continue; } - // else is rate - let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) as f64 / 1000.0; - let last_value = values[len - 1]; - let prev_value = values[len - 2]; let result_value = if last_value < prev_value { // counter reset last_value @@ -144,10 +155,10 @@ impl IDelta { last_value - prev_value }; - result_array.push(Some(result_value / sampled_interval as f64)); + result_builder.append_value(result_value / sampled_interval); } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } } diff --git a/src/promql/src/functions/predict_linear.rs b/src/promql/src/functions/predict_linear.rs index 09a46ed48f..f362a607ba 100644 --- a/src/promql/src/functions/predict_linear.rs +++ b/src/promql/src/functions/predict_linear.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; @@ -28,7 +28,7 @@ use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; use crate::error; -use crate::functions::{extract_array, linear_regression}; +use crate::functions::{extract_array, linear_regression_slice}; use crate::range_array::RangeArray; pub struct PredictLinear; @@ -130,68 +130,75 @@ impl PredictLinear { Box::new(t_array.iter()) } }; - let mut result_array = Vec::with_capacity(ts_range.len()); + let all_timestamps = ts_range + .values() + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let all_values = value_range + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let mut result_builder = Float64Builder::with_capacity(ts_range.len()); for (index, t) in t_iter.enumerate() { - let (timestamps, values) = get_ts_values(&ts_range, &value_range, index, Self::name())?; - let ret = predict_linear_impl(×tamps, &values, t.unwrap()); - result_array.push(ret); + match predict_linear_impl( + &ts_range, + &value_range, + all_timestamps, + all_values, + index, + t.unwrap(), + Self::name(), + )? { + Some(value) => result_builder.append_value(value), + None => result_builder.append_null(), + } } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } } -fn get_ts_values( +fn predict_linear_impl( ts_range: &RangeArray, value_range: &RangeArray, + all_timestamps: &[i64], + all_values: &Float64Array, index: usize, + t: i64, func_name: &str, -) -> Result<(TimestampMillisecondArray, Float64Array), DataFusionError> { - let timestamps = ts_range - .get(index) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .clone(); - let values = value_range - .get(index) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .clone(); +) -> Result, DataFusionError> { + let (ts_offset, ts_len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + ts_len == value_len, DataFusionError::Execution(format!( "{}: time and value arrays in a group should have the same length, found {} and {}", - func_name, - timestamps.len(), - values.len() + func_name, ts_len, value_len )), )?; - Ok((timestamps, values)) -} - -fn predict_linear_impl( - timestamps: &TimestampMillisecondArray, - values: &Float64Array, - t: i64, -) -> Option { - if timestamps.len() < 2 { - return None; + if ts_len < 2 { + return Ok(None); } // last timestamp is evaluation timestamp - let evaluate_ts = timestamps.value(timestamps.len() - 1); - let (slope, intercept) = linear_regression(timestamps, values, evaluate_ts); + let evaluate_ts = all_timestamps[ts_offset + ts_len - 1]; + let (slope, intercept) = linear_regression_slice( + all_timestamps, + all_values, + value_offset, + value_len, + evaluate_ts, + ); if slope.is_none() || intercept.is_none() { - return None; + return Ok(None); } - Some(slope.unwrap() * t as f64 + intercept.unwrap()) + Ok(Some(slope.unwrap() * t as f64 + intercept.unwrap())) } #[cfg(test)] diff --git a/src/promql/src/functions/quantile.rs b/src/promql/src/functions/quantile.rs index f368d5908c..34f2e68439 100644 --- a/src/promql/src/functions/quantile.rs +++ b/src/promql/src/functions/quantile.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use datafusion::arrow::array::Float64Array; +use datafusion::arrow::array::{Float64Array, Float64Builder}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; @@ -93,8 +93,13 @@ impl QuantileOverTime { )), )?; - // calculation - let mut result_array = Vec::with_capacity(ts_range.len()); + let all_values = value_range + .values() + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let mut result_builder = Float64Builder::with_capacity(ts_range.len()); match quantile_col { ColumnarValue::Scalar(quantile_scalar) => { @@ -107,25 +112,25 @@ impl QuantileOverTime { }; for index in 0..ts_range.len() { - let timestamps = ts_range.get(index).unwrap(); - let values = value_range.get(index).unwrap(); - let values = values - .as_any() - .downcast_ref::() - .unwrap() - .values(); + let (_, ts_len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + ts_len == value_len, DataFusionError::Execution(format!( "{}: time and value arrays in a group should have the same length, found {} and {}", Self::name(), - timestamps.len(), - values.len() + ts_len, + value_len )), )?; - let result = quantile_impl(values, quantile); - result_array.push(result); + match quantile_impl( + &all_values[value_offset..value_offset + value_len], + quantile, + ) { + Some(value) => result_builder.append_value(value), + None => result_builder.append_null(), + } } } ColumnarValue::Array(quantile_array) => { @@ -150,20 +155,15 @@ impl QuantileOverTime { )), )?; for index in 0..ts_range.len() { - let timestamps = ts_range.get(index).unwrap(); - let values = value_range.get(index).unwrap(); - let values = values - .as_any() - .downcast_ref::() - .unwrap() - .values(); + let (_, ts_len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + ts_len == value_len, DataFusionError::Execution(format!( "{}: time and value arrays in a group should have the same length, found {} and {}", Self::name(), - timestamps.len(), - values.len() + ts_len, + value_len )), )?; let quantile = if quantile_array.is_null(index) { @@ -171,13 +171,18 @@ impl QuantileOverTime { } else { quantile_array.value(index) }; - let result = quantile_impl(values, quantile); - result_array.push(result); + match quantile_impl( + &all_values[value_offset..value_offset + value_len], + quantile, + ) { + Some(value) => result_builder.append_value(value), + None => result_builder.append_null(), + } } } } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } } diff --git a/src/promql/src/range_array.rs b/src/promql/src/range_array.rs index 4af5217a76..26eab89859 100644 --- a/src/promql/src/range_array.rs +++ b/src/promql/src/range_array.rs @@ -76,13 +76,19 @@ impl RangeArray { } pub fn try_new(dict: DictionaryArray) -> Result { - let ranges_iter = dict - .keys() - .iter() - .map(|compound_key| compound_key.map(unpack)) - .collect::>>() - .context(EmptyRangeSnafu)?; - Self::check_ranges(dict.values().len(), ranges_iter)?; + let value_len = dict.values().len(); + for compound_key in dict.keys().iter() { + let compound_key = compound_key.context(EmptyRangeSnafu)?; + let (offset, length) = unpack(compound_key); + ensure!( + offset as usize + length as usize <= value_len, + IllegalRangeSnafu { + offset, + length, + len: value_len + } + ); + } Ok(Self { array: dict }) }