mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 10:20:38 +00:00
@@ -459,7 +459,7 @@ impl ExecutionPlan for ScalarCalculateExec {
|
||||
input,
|
||||
have_multi_series: false,
|
||||
done: false,
|
||||
batch: None,
|
||||
batches: Vec::new(),
|
||||
tag_value: None,
|
||||
}))
|
||||
}
|
||||
@@ -518,7 +518,7 @@ struct ScalarCalculateStream {
|
||||
project_index: (usize, usize),
|
||||
have_multi_series: bool,
|
||||
done: bool,
|
||||
batch: Option<RecordBatch>,
|
||||
batches: Vec<RecordBatch>,
|
||||
tag_value: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
@@ -577,17 +577,18 @@ impl ScalarCalculateStream {
|
||||
|
||||
fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> {
|
||||
let ts_column = input_batch.column(self.project_index.0).clone();
|
||||
let val_column = cast_with_options(
|
||||
input_batch.column(self.project_index.1),
|
||||
&DataType::Float64,
|
||||
&CastOptions::default(),
|
||||
)?;
|
||||
let val_column =
|
||||
if input_batch.column(self.project_index.1).data_type() == &DataType::Float64 {
|
||||
input_batch.column(self.project_index.1).clone()
|
||||
} else {
|
||||
cast_with_options(
|
||||
input_batch.column(self.project_index.1),
|
||||
&DataType::Float64,
|
||||
&CastOptions::default(),
|
||||
)?
|
||||
};
|
||||
let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?;
|
||||
if let Some(batch) = &self.batch {
|
||||
self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?);
|
||||
} else {
|
||||
self.batch = Some(input_batch);
|
||||
}
|
||||
self.batches.push(input_batch);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -609,8 +610,14 @@ impl Stream for ScalarCalculateStream {
|
||||
// inner is done, producing output
|
||||
None => {
|
||||
self.done = true;
|
||||
return match self.batch.take() {
|
||||
Some(batch) if !self.have_multi_series => {
|
||||
return match (!self.have_multi_series).then(|| self.batches.split_off(0)) {
|
||||
Some(mut batches) if !batches.is_empty() => {
|
||||
let batch = if batches.len() == 1 {
|
||||
batches.pop().unwrap()
|
||||
} else {
|
||||
let refs = batches.iter().collect::<Vec<_>>();
|
||||
concat_batches(&self.schema, refs)?
|
||||
};
|
||||
self.metric.record_output(batch.num_rows());
|
||||
Poll::Ready(Some(Ok(batch)))
|
||||
}
|
||||
|
||||
@@ -77,6 +77,16 @@ pub(crate) fn linear_regression(
|
||||
times: &TimestampMillisecondArray,
|
||||
values: &Float64Array,
|
||||
intercept_time: i64,
|
||||
) -> (Option<f64>, Option<f64>) {
|
||||
linear_regression_slice(times.values(), values, 0, values.len(), intercept_time)
|
||||
}
|
||||
|
||||
pub(crate) fn linear_regression_slice(
|
||||
times: &[i64],
|
||||
values: &Float64Array,
|
||||
offset: usize,
|
||||
len: usize,
|
||||
intercept_time: i64,
|
||||
) -> (Option<f64>, Option<f64>) {
|
||||
let mut count: f64 = 0.0;
|
||||
let mut sum_x: f64 = 0.0;
|
||||
@@ -89,15 +99,16 @@ pub(crate) fn linear_regression(
|
||||
let mut comp_x2: f64 = 0.0;
|
||||
|
||||
let mut const_y = true;
|
||||
let init_y: f64 = values.value(0);
|
||||
let mut init_y = None;
|
||||
|
||||
for (i, value) in values.iter().enumerate() {
|
||||
let time = times.value(i) as f64;
|
||||
for (i, value) in values.iter().skip(offset).take(len).enumerate() {
|
||||
let time = times[offset + i] as f64;
|
||||
if value.is_none() {
|
||||
continue;
|
||||
}
|
||||
let value = value.unwrap();
|
||||
if const_y && i > 0 && value != init_y {
|
||||
let initial = init_y.get_or_insert(value);
|
||||
if const_y && count > 0.0 && value != *initial {
|
||||
const_y = false;
|
||||
}
|
||||
count += 1.0;
|
||||
@@ -113,6 +124,7 @@ pub(crate) fn linear_regression(
|
||||
}
|
||||
|
||||
if const_y {
|
||||
let init_y = init_y.unwrap();
|
||||
if !init_y.is_finite() {
|
||||
return (None, None);
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::Float64Array;
|
||||
use datafusion::arrow::array::{Float64Array, Float64Builder};
|
||||
use datafusion::arrow::datatypes::TimeUnit;
|
||||
use datafusion::common::DataFusionError;
|
||||
use datafusion::logical_expr::{ScalarUDF, Volatility};
|
||||
@@ -177,38 +177,43 @@ impl DoubleExponentialSmoothing {
|
||||
)),
|
||||
)?;
|
||||
|
||||
// calculation
|
||||
let mut result_array = Vec::with_capacity(ts_range.len());
|
||||
let all_values = value_range
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.values();
|
||||
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
|
||||
|
||||
let sf_iter = FactorIterator::new(sf_col, num_rows);
|
||||
let tf_iter = FactorIterator::new(tf_col, num_rows);
|
||||
|
||||
let iter = (0..num_rows)
|
||||
.map(|i| (ts_range.get(i), value_range.get(i)))
|
||||
.zip(sf_iter.zip(tf_iter));
|
||||
let iter = (0..num_rows).zip(sf_iter.zip(tf_iter));
|
||||
|
||||
for ((timestamps, values), (sf, tf)) in iter {
|
||||
let timestamps = timestamps.unwrap();
|
||||
let values = values.unwrap();
|
||||
let values = values
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.values();
|
||||
for (index, (sf, tf)) in iter {
|
||||
let (_, ts_len) = ts_range.get_offset_length(index).unwrap();
|
||||
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
|
||||
error::ensure(
|
||||
timestamps.len() == values.len(),
|
||||
ts_len == value_len,
|
||||
DataFusionError::Execution(format!(
|
||||
"{}: input arrays should have the same length, found {} and {}",
|
||||
Self::name(),
|
||||
timestamps.len(),
|
||||
values.len()
|
||||
ts_len,
|
||||
value_len
|
||||
)),
|
||||
)?;
|
||||
|
||||
result_array.push(double_exponential_smoothing_impl(values, sf, tf));
|
||||
match double_exponential_smoothing_impl(
|
||||
&all_values[value_offset..value_offset + value_len],
|
||||
sf,
|
||||
tf,
|
||||
) {
|
||||
Some(value) => result_builder.append_value(value),
|
||||
None => result_builder.append_null(),
|
||||
}
|
||||
}
|
||||
|
||||
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
|
||||
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
@@ -240,8 +245,6 @@ fn double_exponential_smoothing_impl(values: &[f64], sf: f64, tf: f64) -> Option
|
||||
return Some(f64::NAN);
|
||||
}
|
||||
|
||||
let values = values.to_vec();
|
||||
|
||||
let mut s0 = 0.0;
|
||||
let mut s1 = values[0];
|
||||
let mut b = values[1] - values[0];
|
||||
|
||||
@@ -32,7 +32,7 @@
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray};
|
||||
use datafusion::arrow::datatypes::TimeUnit;
|
||||
use datafusion::common::{DataFusionError, Result as DfResult};
|
||||
use datafusion::logical_expr::{ScalarUDF, Volatility};
|
||||
@@ -121,7 +121,7 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
|
||||
.unwrap();
|
||||
|
||||
// calculation
|
||||
let mut result_array = Vec::with_capacity(ts_range.len());
|
||||
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
|
||||
|
||||
let all_timestamps = ts_range
|
||||
.values()
|
||||
@@ -144,7 +144,7 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
|
||||
let values = &all_values[offset..offset + length];
|
||||
|
||||
if values.len() < 2 {
|
||||
result_array.push(None);
|
||||
result_builder.append_null();
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -173,10 +173,10 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
|
||||
factor /= self.range_length as f64 / 1000.0;
|
||||
}
|
||||
|
||||
result_array.push(Some(result_value * factor));
|
||||
result_builder.append_value(result_value * factor);
|
||||
}
|
||||
|
||||
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
|
||||
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray};
|
||||
use datafusion::arrow::datatypes::TimeUnit;
|
||||
use datafusion::common::DataFusionError;
|
||||
use datafusion::logical_expr::{ScalarUDF, Volatility};
|
||||
@@ -94,49 +94,60 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
|
||||
)),
|
||||
)?;
|
||||
|
||||
// calculation
|
||||
let mut result_array = Vec::with_capacity(ts_range.len());
|
||||
let ts_values = ts_range.values();
|
||||
let ts_values = ts_values
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.values();
|
||||
|
||||
let value_values = value_range.values();
|
||||
let value_values = value_values
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.values();
|
||||
|
||||
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
|
||||
|
||||
for index in 0..ts_range.len() {
|
||||
let timestamps = ts_range.get(index).unwrap();
|
||||
let timestamps = timestamps
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.values();
|
||||
|
||||
let values = value_range.get(index).unwrap();
|
||||
let values = values
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.values();
|
||||
let Some((ts_offset, len)) = ts_range.get_offset_length(index) else {
|
||||
result_builder.append_null();
|
||||
continue;
|
||||
};
|
||||
let Some((value_offset, value_len)) = value_range.get_offset_length(index) else {
|
||||
result_builder.append_null();
|
||||
continue;
|
||||
};
|
||||
error::ensure(
|
||||
timestamps.len() == values.len(),
|
||||
len == value_len,
|
||||
DataFusionError::Execution(format!(
|
||||
"{}: input arrays should have the same length, found {} and {}",
|
||||
Self::name(),
|
||||
timestamps.len(),
|
||||
values.len()
|
||||
len,
|
||||
value_len
|
||||
)),
|
||||
)?;
|
||||
|
||||
let len = timestamps.len();
|
||||
if len < 2 {
|
||||
result_array.push(None);
|
||||
result_builder.append_null();
|
||||
continue;
|
||||
}
|
||||
|
||||
// if is delta
|
||||
let last_offset = ts_offset + len - 1;
|
||||
let prev_offset = last_offset - 1;
|
||||
let sampled_interval =
|
||||
(ts_values[last_offset] - ts_values[prev_offset]) as f64 / 1000.0;
|
||||
|
||||
let last_value_offset = value_offset + len - 1;
|
||||
let prev_value_offset = last_value_offset - 1;
|
||||
let last_value = value_values[last_value_offset];
|
||||
let prev_value = value_values[prev_value_offset];
|
||||
|
||||
if !IS_RATE {
|
||||
result_array.push(Some(values[len - 1] - values[len - 2]));
|
||||
result_builder.append_value(last_value - prev_value);
|
||||
continue;
|
||||
}
|
||||
|
||||
// else is rate
|
||||
let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) as f64 / 1000.0;
|
||||
let last_value = values[len - 1];
|
||||
let prev_value = values[len - 2];
|
||||
let result_value = if last_value < prev_value {
|
||||
// counter reset
|
||||
last_value
|
||||
@@ -144,10 +155,10 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
|
||||
last_value - prev_value
|
||||
};
|
||||
|
||||
result_array.push(Some(result_value / sampled_interval as f64));
|
||||
result_builder.append_value(result_value / sampled_interval);
|
||||
}
|
||||
|
||||
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
|
||||
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray};
|
||||
use datafusion::arrow::datatypes::TimeUnit;
|
||||
use datafusion::common::DataFusionError;
|
||||
use datafusion::logical_expr::{ScalarUDF, Volatility};
|
||||
@@ -28,7 +28,7 @@ use datatypes::arrow::array::Array;
|
||||
use datatypes::arrow::datatypes::DataType;
|
||||
|
||||
use crate::error;
|
||||
use crate::functions::{extract_array, linear_regression};
|
||||
use crate::functions::{extract_array, linear_regression_slice};
|
||||
use crate::range_array::RangeArray;
|
||||
|
||||
pub struct PredictLinear;
|
||||
@@ -130,68 +130,75 @@ impl PredictLinear {
|
||||
Box::new(t_array.iter())
|
||||
}
|
||||
};
|
||||
let mut result_array = Vec::with_capacity(ts_range.len());
|
||||
let all_timestamps = ts_range
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.values();
|
||||
let all_values = value_range
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap();
|
||||
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
|
||||
for (index, t) in t_iter.enumerate() {
|
||||
let (timestamps, values) = get_ts_values(&ts_range, &value_range, index, Self::name())?;
|
||||
let ret = predict_linear_impl(×tamps, &values, t.unwrap());
|
||||
result_array.push(ret);
|
||||
match predict_linear_impl(
|
||||
&ts_range,
|
||||
&value_range,
|
||||
all_timestamps,
|
||||
all_values,
|
||||
index,
|
||||
t.unwrap(),
|
||||
Self::name(),
|
||||
)? {
|
||||
Some(value) => result_builder.append_value(value),
|
||||
None => result_builder.append_null(),
|
||||
}
|
||||
}
|
||||
|
||||
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
|
||||
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_ts_values(
|
||||
fn predict_linear_impl(
|
||||
ts_range: &RangeArray,
|
||||
value_range: &RangeArray,
|
||||
all_timestamps: &[i64],
|
||||
all_values: &Float64Array,
|
||||
index: usize,
|
||||
t: i64,
|
||||
func_name: &str,
|
||||
) -> Result<(TimestampMillisecondArray, Float64Array), DataFusionError> {
|
||||
let timestamps = ts_range
|
||||
.get(index)
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.clone();
|
||||
let values = value_range
|
||||
.get(index)
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.clone();
|
||||
) -> Result<Option<f64>, DataFusionError> {
|
||||
let (ts_offset, ts_len) = ts_range.get_offset_length(index).unwrap();
|
||||
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
|
||||
error::ensure(
|
||||
timestamps.len() == values.len(),
|
||||
ts_len == value_len,
|
||||
DataFusionError::Execution(format!(
|
||||
"{}: time and value arrays in a group should have the same length, found {} and {}",
|
||||
func_name,
|
||||
timestamps.len(),
|
||||
values.len()
|
||||
func_name, ts_len, value_len
|
||||
)),
|
||||
)?;
|
||||
Ok((timestamps, values))
|
||||
}
|
||||
|
||||
fn predict_linear_impl(
|
||||
timestamps: &TimestampMillisecondArray,
|
||||
values: &Float64Array,
|
||||
t: i64,
|
||||
) -> Option<f64> {
|
||||
if timestamps.len() < 2 {
|
||||
return None;
|
||||
if ts_len < 2 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// last timestamp is evaluation timestamp
|
||||
let evaluate_ts = timestamps.value(timestamps.len() - 1);
|
||||
let (slope, intercept) = linear_regression(timestamps, values, evaluate_ts);
|
||||
let evaluate_ts = all_timestamps[ts_offset + ts_len - 1];
|
||||
let (slope, intercept) = linear_regression_slice(
|
||||
all_timestamps,
|
||||
all_values,
|
||||
value_offset,
|
||||
value_len,
|
||||
evaluate_ts,
|
||||
);
|
||||
|
||||
if slope.is_none() || intercept.is_none() {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Some(slope.unwrap() * t as f64 + intercept.unwrap())
|
||||
Ok(Some(slope.unwrap() * t as f64 + intercept.unwrap()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::Float64Array;
|
||||
use datafusion::arrow::array::{Float64Array, Float64Builder};
|
||||
use datafusion::arrow::datatypes::TimeUnit;
|
||||
use datafusion::common::DataFusionError;
|
||||
use datafusion::logical_expr::{ScalarUDF, Volatility};
|
||||
@@ -93,8 +93,13 @@ impl QuantileOverTime {
|
||||
)),
|
||||
)?;
|
||||
|
||||
// calculation
|
||||
let mut result_array = Vec::with_capacity(ts_range.len());
|
||||
let all_values = value_range
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.values();
|
||||
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
|
||||
|
||||
match quantile_col {
|
||||
ColumnarValue::Scalar(quantile_scalar) => {
|
||||
@@ -107,25 +112,25 @@ impl QuantileOverTime {
|
||||
};
|
||||
|
||||
for index in 0..ts_range.len() {
|
||||
let timestamps = ts_range.get(index).unwrap();
|
||||
let values = value_range.get(index).unwrap();
|
||||
let values = values
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.values();
|
||||
let (_, ts_len) = ts_range.get_offset_length(index).unwrap();
|
||||
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
|
||||
error::ensure(
|
||||
timestamps.len() == values.len(),
|
||||
ts_len == value_len,
|
||||
DataFusionError::Execution(format!(
|
||||
"{}: time and value arrays in a group should have the same length, found {} and {}",
|
||||
Self::name(),
|
||||
timestamps.len(),
|
||||
values.len()
|
||||
ts_len,
|
||||
value_len
|
||||
)),
|
||||
)?;
|
||||
|
||||
let result = quantile_impl(values, quantile);
|
||||
result_array.push(result);
|
||||
match quantile_impl(
|
||||
&all_values[value_offset..value_offset + value_len],
|
||||
quantile,
|
||||
) {
|
||||
Some(value) => result_builder.append_value(value),
|
||||
None => result_builder.append_null(),
|
||||
}
|
||||
}
|
||||
}
|
||||
ColumnarValue::Array(quantile_array) => {
|
||||
@@ -150,20 +155,15 @@ impl QuantileOverTime {
|
||||
)),
|
||||
)?;
|
||||
for index in 0..ts_range.len() {
|
||||
let timestamps = ts_range.get(index).unwrap();
|
||||
let values = value_range.get(index).unwrap();
|
||||
let values = values
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.values();
|
||||
let (_, ts_len) = ts_range.get_offset_length(index).unwrap();
|
||||
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
|
||||
error::ensure(
|
||||
timestamps.len() == values.len(),
|
||||
ts_len == value_len,
|
||||
DataFusionError::Execution(format!(
|
||||
"{}: time and value arrays in a group should have the same length, found {} and {}",
|
||||
Self::name(),
|
||||
timestamps.len(),
|
||||
values.len()
|
||||
ts_len,
|
||||
value_len
|
||||
)),
|
||||
)?;
|
||||
let quantile = if quantile_array.is_null(index) {
|
||||
@@ -171,13 +171,18 @@ impl QuantileOverTime {
|
||||
} else {
|
||||
quantile_array.value(index)
|
||||
};
|
||||
let result = quantile_impl(values, quantile);
|
||||
result_array.push(result);
|
||||
match quantile_impl(
|
||||
&all_values[value_offset..value_offset + value_len],
|
||||
quantile,
|
||||
) {
|
||||
Some(value) => result_builder.append_value(value),
|
||||
None => result_builder.append_null(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
|
||||
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,13 +76,19 @@ impl RangeArray {
|
||||
}
|
||||
|
||||
pub fn try_new(dict: DictionaryArray<Int64Type>) -> Result<Self> {
|
||||
let ranges_iter = dict
|
||||
.keys()
|
||||
.iter()
|
||||
.map(|compound_key| compound_key.map(unpack))
|
||||
.collect::<Option<Vec<_>>>()
|
||||
.context(EmptyRangeSnafu)?;
|
||||
Self::check_ranges(dict.values().len(), ranges_iter)?;
|
||||
let value_len = dict.values().len();
|
||||
for compound_key in dict.keys().iter() {
|
||||
let compound_key = compound_key.context(EmptyRangeSnafu)?;
|
||||
let (offset, length) = unpack(compound_key);
|
||||
ensure!(
|
||||
offset as usize + length as usize <= value_len,
|
||||
IllegalRangeSnafu {
|
||||
offset,
|
||||
length,
|
||||
len: value_len
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Self { array: dict })
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user