mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-31 12:20:38 +00:00
build(deps): update datafusion to latest and arrow to 51.0 (#3661)
* chore: update datafusion * update sqlness case of time.sql Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: adjust range query partition * fix: hisogram incorrect result Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: ignore filter pushdown temporarily Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: update limit sqlness result Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: histogram with wrong distribution Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: update negative ordinal sqlness case Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * feat: bump df to cd7a00b Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * resolve conflicts * ignore test_range_filter Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix promql exec panic * fix "select count(*)" exec error * re-enable the "test_range_filter" test since the filter push down seems not necessary to be removed * fix: range query schema error * update sqlness results Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * resolve conflicts * update datafusion, again * fix pyo3 compile error, and update some sqlness results * update decimal sqlness cases Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: promql literal * fix udaf tests * fix filter pushdown sqlness tests * fix?: test_cast * fix: rspy test fail due to datafusion `sin` signature change * rebase main to see if there are any failed tests * debug ci * debug ci * debug ci * enforce input partition Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * debug ci * fix ci * fix ci * debug ci * debug ci * debug ci * fix sqlness * feat: do not return error while creating a filter * chore: remove array from error * chore: replace todo with unimplemented * Update src/flow/clippy.toml Co-authored-by: Yingwen <realevenyag@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: WUJingdi <taylor-lagrange@qq.com> Co-authored-by: discord9 <discord9@163.com> Co-authored-by: evenyag <realevenyag@gmail.com> Co-authored-by: tison <wander4096@gmail.com>
This commit is contained in:
@@ -11,6 +11,7 @@ python = [
|
||||
"dep:datafusion",
|
||||
"dep:datafusion-common",
|
||||
"dep:datafusion-expr",
|
||||
"dep:datafusion-functions",
|
||||
"dep:datafusion-physical-expr",
|
||||
"dep:rustpython-vm",
|
||||
"dep:rustpython-parser",
|
||||
@@ -45,6 +46,7 @@ crossbeam-utils = "0.8.14"
|
||||
datafusion = { workspace = true, optional = true }
|
||||
datafusion-common = { workspace = true, optional = true }
|
||||
datafusion-expr = { workspace = true, optional = true }
|
||||
datafusion-functions = { workspace = true, optional = true }
|
||||
datafusion-physical-expr = { workspace = true, optional = true }
|
||||
datatypes.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -54,7 +56,7 @@ paste = { workspace = true, optional = true }
|
||||
prometheus.workspace = true
|
||||
query.workspace = true
|
||||
# TODO(discord9): This is a forked and tweaked version of RustPython, please update it to newest original RustPython After RustPython support GC
|
||||
pyo3 = { version = "0.19", optional = true, features = ["abi3", "abi3-py37"] }
|
||||
pyo3 = { version = "0.20", optional = true, features = ["abi3", "abi3-py37"] }
|
||||
rustpython-codegen = { git = "https://github.com/discord9/RustPython", optional = true, rev = "9ed5137412" }
|
||||
rustpython-compiler = { git = "https://github.com/discord9/RustPython", optional = true, rev = "9ed5137412" }
|
||||
rustpython-compiler-core = { git = "https://github.com/discord9/RustPython", optional = true, rev = "9ed5137412" }
|
||||
|
||||
@@ -126,6 +126,10 @@ impl Function for PyUDF {
|
||||
}
|
||||
|
||||
fn signature(&self) -> common_query::prelude::Signature {
|
||||
if self.copr.arg_types.is_empty() {
|
||||
return Signature::any(0, Volatility::Volatile);
|
||||
}
|
||||
|
||||
// try our best to get a type signature
|
||||
let mut arg_types = Vec::with_capacity(self.copr.arg_types.len());
|
||||
let mut know_all_types = true;
|
||||
|
||||
@@ -21,8 +21,10 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use common_query::OutputData;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datatypes::arrow::compute;
|
||||
use datatypes::data_type::{ConcreteDataType, DataType};
|
||||
use datatypes::prelude::Value;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::{Helper, VectorRef};
|
||||
// use crate::python::builtins::greptime_builtin;
|
||||
@@ -42,7 +44,9 @@ use vm::{pyclass as rspyclass, PyObjectRef, PyPayload, PyResult, VirtualMachine}
|
||||
|
||||
use super::py_recordbatch::PyRecordBatch;
|
||||
use crate::engine::EvalContext;
|
||||
use crate::python::error::{ensure, ArrowSnafu, OtherSnafu, Result, TypeCastSnafu};
|
||||
use crate::python::error::{
|
||||
ensure, ArrowSnafu, DataFusionSnafu, OtherSnafu, Result, TypeCastSnafu,
|
||||
};
|
||||
use crate::python::ffi_types::PyVector;
|
||||
#[cfg(feature = "pyo3_backend")]
|
||||
use crate::python::pyo3::pyo3_exec_parsed;
|
||||
@@ -179,6 +183,25 @@ impl Coprocessor {
|
||||
|
||||
/// check if real types and annotation types(if have) is the same, if not try cast columns to annotated type
|
||||
pub(crate) fn check_and_cast_type(&self, cols: &mut [VectorRef]) -> Result<()> {
|
||||
for col in cols.iter_mut() {
|
||||
if let ConcreteDataType::List(x) = col.data_type() {
|
||||
let values =
|
||||
ScalarValue::convert_array_to_scalar_vec(col.to_arrow_array().as_ref())
|
||||
.context(DataFusionSnafu)?
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(Value::try_from)
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(TypeCastSnafu)?;
|
||||
|
||||
let mut builder = x.item_type().create_mutable_vector(values.len());
|
||||
for v in values.iter() {
|
||||
builder.push_value_ref(v.as_value_ref());
|
||||
}
|
||||
*col = builder.to_vector();
|
||||
}
|
||||
}
|
||||
|
||||
let return_types = &self.return_types;
|
||||
// allow ignore Return Type Annotation
|
||||
if return_types.is_empty() {
|
||||
|
||||
@@ -209,6 +209,7 @@ fn eval_pyo3(case: CodeBlockTestCase) {
|
||||
let res_vec = locals
|
||||
.get_item("ret")
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.extract::<PyVector>()
|
||||
.map_err(|e| {
|
||||
dbg!(&case.script);
|
||||
|
||||
@@ -525,17 +525,12 @@ pub fn val_to_pyobj(val: value::Value, vm: &VirtualMachine) -> PyResult {
|
||||
// FIXME(dennis): lose the timestamp unit here
|
||||
Value::Timestamp(v) => vm.ctx.new_int(v.value()).into(),
|
||||
value::Value::List(list) => {
|
||||
let list = list.items().as_ref();
|
||||
match list {
|
||||
Some(list) => {
|
||||
let list: Vec<_> = list
|
||||
.iter()
|
||||
.map(|v| val_to_pyobj(v.clone(), vm))
|
||||
.collect::<Result<_, _>>()?;
|
||||
vm.ctx.new_list(list).into()
|
||||
}
|
||||
None => vm.ctx.new_list(Vec::new()).into(),
|
||||
}
|
||||
let list: Vec<_> = list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| val_to_pyobj(v.clone(), vm))
|
||||
.collect::<Result<_, _>>()?;
|
||||
vm.ctx.new_list(list).into()
|
||||
}
|
||||
#[allow(unreachable_patterns)]
|
||||
_ => return Err(vm.new_type_error(format!("Convert from {val:?} is not supported yet"))),
|
||||
|
||||
@@ -19,7 +19,7 @@ use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use datafusion::arrow::array::{ArrayRef, NullArray};
|
||||
use datafusion::physical_plan::expressions;
|
||||
use datafusion_expr::ColumnarValue;
|
||||
use datafusion_physical_expr::{math_expressions, AggregateExpr};
|
||||
use datafusion_physical_expr::AggregateExpr;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use pyo3::exceptions::{PyKeyError, PyValueError};
|
||||
use pyo3::prelude::*;
|
||||
@@ -133,7 +133,7 @@ fn get_globals(py: Python) -> PyResult<&PyDict> {
|
||||
fn dataframe(py: Python) -> PyResult<PyDataFrame> {
|
||||
let globals = get_globals(py)?;
|
||||
let df = globals
|
||||
.get_item("__dataframe__")
|
||||
.get_item("__dataframe__")?
|
||||
.ok_or_else(|| PyKeyError::new_err("No __dataframe__ variable is found"))?
|
||||
.extract::<PyDataFrame>()?;
|
||||
Ok(df)
|
||||
@@ -144,7 +144,7 @@ fn dataframe(py: Python) -> PyResult<PyDataFrame> {
|
||||
pub(crate) fn query_engine(py: Python) -> PyResult<PyQueryEngine> {
|
||||
let globals = get_globals(py)?;
|
||||
let query = globals
|
||||
.get_item("__query__")
|
||||
.get_item("__query__")?
|
||||
.ok_or_else(|| PyKeyError::new_err("No __query__ variable is found"))?
|
||||
.extract::<PyQueryEngine>()?;
|
||||
Ok(query)
|
||||
@@ -237,7 +237,9 @@ macro_rules! bind_call_unary_math_function {
|
||||
fn $DF_FUNC(py: Python<'_>, val: PyObject) -> PyResult<PyObject> {
|
||||
let args =
|
||||
&[all_to_f64(try_into_columnar_value(py, val)?).map_err(PyValueError::new_err)?];
|
||||
let res = math_expressions::$DF_FUNC(args).map_err(|e| PyValueError::new_err(format!("{e:?}")))?;
|
||||
let res = datafusion_functions::math::$DF_FUNC()
|
||||
.invoke(args)
|
||||
.map_err(|e| PyValueError::new_err(format!("{e:?}")))?;
|
||||
columnar_value_to_py_any(py, res)
|
||||
}
|
||||
)*
|
||||
@@ -293,18 +295,19 @@ fn random(py: Python<'_>, len: usize) -> PyResult<PyObject> {
|
||||
// more info at: https://doc.rust-lang.org/reference/procedural-macros.html#procedural-macro-hygiene
|
||||
let arg = NullArray::new(len);
|
||||
let args = &[ColumnarValue::Array(std::sync::Arc::new(arg) as _)];
|
||||
let res =
|
||||
math_expressions::random(args).map_err(|e| PyValueError::new_err(format!("{e:?}")))?;
|
||||
|
||||
let res = datafusion_functions::math::random()
|
||||
.invoke(args)
|
||||
.map_err(|e| PyValueError::new_err(format!("{e:?}")))?;
|
||||
columnar_value_to_py_any(py, res)
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn round(py: Python<'_>, val: PyObject) -> PyResult<PyObject> {
|
||||
let value = try_into_columnar_value(py, val)?;
|
||||
let array = value.into_array(1);
|
||||
let result =
|
||||
math_expressions::round(&[array]).map_err(|e| PyValueError::new_err(format!("{e:?}")))?;
|
||||
let result = datafusion_functions::math::round()
|
||||
.invoke(&[value])
|
||||
.and_then(|x| x.into_array(1))
|
||||
.map_err(|e| PyValueError::new_err(format!("{e:?}")))?;
|
||||
columnar_value_to_py_any(py, ColumnarValue::Array(result))
|
||||
}
|
||||
|
||||
@@ -368,7 +371,19 @@ fn approx_percentile_cont(py: Python<'_>, values: &PyVector, percent: f64) -> Py
|
||||
)
|
||||
}
|
||||
|
||||
bind_aggr_expr!(array_agg, ArrayAgg,[v0], v0, expr0=>0);
|
||||
#[pyfunction]
|
||||
fn array_agg(py: Python<'_>, v: &PyVector) -> PyResult<PyObject> {
|
||||
eval_df_aggr_expr(
|
||||
py,
|
||||
expressions::ArrayAgg::new(
|
||||
Arc::new(expressions::Column::new("expr0", 0)) as _,
|
||||
"ArrayAgg",
|
||||
v.arrow_data_type(),
|
||||
true,
|
||||
),
|
||||
&[v.to_arrow_array()],
|
||||
)
|
||||
}
|
||||
|
||||
bind_aggr_expr!(avg, Avg,[v0], v0, expr0=>0);
|
||||
|
||||
|
||||
@@ -144,7 +144,9 @@ coprocessor = copr
|
||||
// could generate a call in python code and use Python::run to run it, just like in RustPython
|
||||
// Expect either: a PyVector Or a List/Tuple of PyVector
|
||||
py.run(&script, Some(globals), Some(locals))?;
|
||||
let result = locals.get_item("_return_from_coprocessor").ok_or_else(|| PyValueError::new_err("Can't find return value of coprocessor function"))?;
|
||||
let result = locals.get_item("_return_from_coprocessor")?.ok_or_else(||
|
||||
PyValueError::new_err(format!("cannot find the return value of script '{script}'"))
|
||||
)?;
|
||||
|
||||
let col_len = rb.as_ref().map(|rb| rb.num_rows()).unwrap_or(1);
|
||||
py_any_to_vec(result, col_len)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use arrow::pyarrow::PyArrowException;
|
||||
use common_telemetry::info;
|
||||
@@ -27,7 +27,7 @@ use pyo3::exceptions::PyValueError;
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyTuple};
|
||||
|
||||
use crate::python::ffi_types::utils::{collect_diff_types_string, new_item_field};
|
||||
use crate::python::ffi_types::utils::collect_diff_types_string;
|
||||
use crate::python::ffi_types::PyVector;
|
||||
use crate::python::metric;
|
||||
use crate::python::pyo3::builtins::greptime_builtins;
|
||||
@@ -75,9 +75,10 @@ pub fn val_to_py_any(py: Python<'_>, val: Value) -> PyResult<PyObject> {
|
||||
Value::DateTime(val) => val.val().to_object(py),
|
||||
Value::Timestamp(val) => val.value().to_object(py),
|
||||
Value::List(val) => {
|
||||
let list = val.items().clone().unwrap_or(Default::default());
|
||||
let list = list
|
||||
.into_iter()
|
||||
let list = val
|
||||
.items()
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|v| val_to_py_any(py, v))
|
||||
.collect::<PyResult<Vec<_>>>()?;
|
||||
list.to_object(py)
|
||||
@@ -211,9 +212,13 @@ pub fn scalar_value_to_py_any(py: Python<'_>, val: ScalarValue) -> PyResult<PyOb
|
||||
match val{
|
||||
ScalarValue::Null => Ok(py.None()),
|
||||
$(ScalarValue::$scalar_ty(Some(v)) => Ok(v.to_object(py)),)*
|
||||
ScalarValue::List(Some(col), _) => {
|
||||
ScalarValue::List(array) => {
|
||||
let col = ScalarValue::convert_array_to_scalar_vec(array.as_ref()).map_err(|e|
|
||||
PyValueError::new_err(format!("{e}"))
|
||||
)?;
|
||||
let list:Vec<PyObject> = col
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|v| scalar_value_to_py_any(py, v))
|
||||
.collect::<PyResult<_>>()?;
|
||||
let list = PyList::new(py, list);
|
||||
@@ -281,8 +286,7 @@ pub fn try_into_columnar_value(py: Python<'_>, obj: PyObject) -> PyResult<Column
|
||||
|
||||
if ret.is_empty() {
|
||||
return Ok(ColumnarValue::Scalar(ScalarValue::List(
|
||||
None,
|
||||
Arc::new(new_item_field(ArrowDataType::Null)),
|
||||
ScalarValue::new_list(&[], &ArrowDataType::Null),
|
||||
)));
|
||||
}
|
||||
let ty = ret[0].data_type();
|
||||
@@ -294,8 +298,7 @@ pub fn try_into_columnar_value(py: Python<'_>, obj: PyObject) -> PyResult<Column
|
||||
)));
|
||||
}
|
||||
Ok(ColumnarValue::Scalar(ScalarValue::List(
|
||||
Some(ret),
|
||||
Arc::new(new_item_field(ty)),
|
||||
ScalarValue::new_list(ret.as_slice(), &ty),
|
||||
)))
|
||||
} else {
|
||||
to_rust_types!(obj,
|
||||
|
||||
@@ -17,14 +17,12 @@
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_common::{DataFusionError, ScalarValue};
|
||||
use datafusion_expr::ColumnarValue as DFColValue;
|
||||
use datafusion_physical_expr::AggregateExpr;
|
||||
use datatypes::arrow::array::ArrayRef;
|
||||
use datatypes::arrow::compute;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
|
||||
use datatypes::arrow::datatypes::DataType as ArrowDataType;
|
||||
use datatypes::vectors::Helper as HelperVec;
|
||||
use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr};
|
||||
use rustpython_vm::{pymodule, AsObject, PyObjectRef, PyPayload, PyResult, VirtualMachine};
|
||||
@@ -63,10 +61,6 @@ fn collect_diff_types_string(values: &[ScalarValue], ty: &ArrowDataType) -> Stri
|
||||
.unwrap_or_else(|| "Nothing".to_string())
|
||||
}
|
||||
|
||||
fn new_item_field(data_type: ArrowDataType) -> Field {
|
||||
Field::new("item", data_type, false)
|
||||
}
|
||||
|
||||
/// try to turn a Python Object into a PyVector or a scalar that can be use for calculate
|
||||
///
|
||||
/// supported scalar are(leftside is python data type, right side is rust type):
|
||||
@@ -119,8 +113,7 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul
|
||||
if ret.is_empty() {
|
||||
// TODO(dennis): empty list, we set type as null.
|
||||
return Ok(DFColValue::Scalar(ScalarValue::List(
|
||||
None,
|
||||
Arc::new(new_item_field(ArrowDataType::Null)),
|
||||
ScalarValue::new_list(&[], &ArrowDataType::Null),
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -132,8 +125,7 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul
|
||||
)));
|
||||
}
|
||||
Ok(DFColValue::Scalar(ScalarValue::List(
|
||||
Some(ret),
|
||||
Arc::new(new_item_field(ty)),
|
||||
ScalarValue::new_list(&ret, &ty),
|
||||
)))
|
||||
} else {
|
||||
Err(vm.new_type_error(format!(
|
||||
@@ -176,9 +168,11 @@ fn scalar_val_try_into_py_obj(val: ScalarValue, vm: &VirtualMachine) -> PyResult
|
||||
ScalarValue::Float64(Some(v)) => Ok(PyFloat::from(v).into_pyobject(vm)),
|
||||
ScalarValue::Int64(Some(v)) => Ok(PyInt::from(v).into_pyobject(vm)),
|
||||
ScalarValue::UInt64(Some(v)) => Ok(PyInt::from(v).into_pyobject(vm)),
|
||||
ScalarValue::List(Some(col), _) => {
|
||||
let list = col
|
||||
ScalarValue::List(list) => {
|
||||
let list = ScalarValue::convert_array_to_scalar_vec(list.as_ref())
|
||||
.map_err(|e| from_df_err(e, vm))?
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|v| scalar_val_try_into_py_obj(v, vm))
|
||||
.collect::<Result<_, _>>()?;
|
||||
let list = vm.ctx.new_list(list);
|
||||
@@ -228,9 +222,10 @@ macro_rules! bind_call_unary_math_function {
|
||||
($DF_FUNC: ident, $vm: ident $(,$ARG: ident)*) => {
|
||||
fn inner_fn($($ARG: PyObjectRef,)* vm: &VirtualMachine) -> PyResult<PyObjectRef> {
|
||||
let args = &[$(all_to_f64(try_into_columnar_value($ARG, vm)?, vm)?,)*];
|
||||
let res = math_expressions::$DF_FUNC(args).map_err(|err| from_df_err(err, vm))?;
|
||||
let ret = try_into_py_obj(res, vm)?;
|
||||
Ok(ret)
|
||||
datafusion_functions::math::$DF_FUNC()
|
||||
.invoke(args)
|
||||
.map_err(|e| from_df_err(e, vm))
|
||||
.and_then(|x| try_into_py_obj(x, vm))
|
||||
}
|
||||
return inner_fn($($ARG,)* $vm);
|
||||
};
|
||||
@@ -295,7 +290,6 @@ pub(crate) mod greptime_builtin {
|
||||
use datafusion::dataframe::DataFrame as DfDataFrame;
|
||||
use datafusion::physical_plan::expressions;
|
||||
use datafusion_expr::{ColumnarValue as DFColValue, Expr as DfExpr};
|
||||
use datafusion_physical_expr::math_expressions;
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array, NullArray};
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow::{self, compute};
|
||||
@@ -548,8 +542,10 @@ pub(crate) mod greptime_builtin {
|
||||
#[pyfunction]
|
||||
fn round(val: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
|
||||
let value = try_into_columnar_value(val, vm)?;
|
||||
let array = value.into_array(1);
|
||||
let result = math_expressions::round(&[array]).map_err(|e| from_df_err(e, vm))?;
|
||||
let result = datafusion_functions::math::round()
|
||||
.invoke(&[value])
|
||||
.and_then(|x| x.into_array(1))
|
||||
.map_err(|e| from_df_err(e, vm))?;
|
||||
try_into_py_obj(DFColValue::Array(result), vm)
|
||||
}
|
||||
|
||||
@@ -604,7 +600,9 @@ pub(crate) mod greptime_builtin {
|
||||
// more info at: https://doc.rust-lang.org/reference/procedural-macros.html#procedural-macro-hygiene
|
||||
let arg = NullArray::new(len);
|
||||
let args = &[DFColValue::Array(std::sync::Arc::new(arg) as _)];
|
||||
let res = math_expressions::random(args).map_err(|err| from_df_err(err, vm))?;
|
||||
let res = datafusion_functions::math::random()
|
||||
.invoke(args)
|
||||
.map_err(|err| from_df_err(err, vm))?;
|
||||
let ret = try_into_py_obj(res, vm)?;
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -673,13 +671,16 @@ pub(crate) mod greptime_builtin {
|
||||
/// effectively equals to `list(vector)`
|
||||
#[pyfunction]
|
||||
fn array_agg(values: PyVectorRef, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
|
||||
bind_aggr_fn!(
|
||||
ArrayAgg,
|
||||
vm,
|
||||
eval_aggr_fn(
|
||||
expressions::ArrayAgg::new(
|
||||
Arc::new(expressions::Column::new("expr0", 0)) as _,
|
||||
"ArrayAgg",
|
||||
values.arrow_data_type(),
|
||||
false,
|
||||
),
|
||||
&[values.to_arrow_array()],
|
||||
values.arrow_data_type(),
|
||||
expr0
|
||||
);
|
||||
vm,
|
||||
)
|
||||
}
|
||||
|
||||
/// directly port from datafusion's `avg` function
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::Array;
|
||||
use common_telemetry::{error, info};
|
||||
use datatypes::arrow::array::{Float64Array, Int64Array};
|
||||
use datatypes::arrow::compute;
|
||||
@@ -68,18 +69,18 @@ fn convert_scalar_to_py_obj_and_back() {
|
||||
} else {
|
||||
panic!("Convert errors, expect 1")
|
||||
}
|
||||
let col = DFColValue::Scalar(ScalarValue::List(
|
||||
Some(vec![
|
||||
ScalarValue::Int64(Some(1)),
|
||||
ScalarValue::Int64(Some(2)),
|
||||
]),
|
||||
Arc::new(Field::new("item", ArrowDataType::Int64, false)),
|
||||
));
|
||||
let col = DFColValue::Scalar(ScalarValue::List(ScalarValue::new_list(
|
||||
&[ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))],
|
||||
&ArrowDataType::Int64,
|
||||
)));
|
||||
let to = try_into_py_obj(col, vm).unwrap();
|
||||
let back = try_into_columnar_value(to, vm).unwrap();
|
||||
if let DFColValue::Scalar(ScalarValue::List(Some(list), field)) = back {
|
||||
assert_eq!(list.len(), 2);
|
||||
assert_eq!(*field.data_type(), ArrowDataType::Int64);
|
||||
if let DFColValue::Scalar(ScalarValue::List(list)) = back {
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(
|
||||
list.data_type(),
|
||||
&ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int64, true)))
|
||||
);
|
||||
}
|
||||
let list: Vec<PyObjectRef> = vec![vm.ctx.new_int(1).into(), vm.ctx.new_int(2).into()];
|
||||
let nested_list: Vec<PyObjectRef> =
|
||||
|
||||
@@ -707,7 +707,7 @@ from greptime import *
|
||||
sin(num)"#,
|
||||
expect: Ok((
|
||||
ty: Float64,
|
||||
value: Float(0.8414709848078965)
|
||||
value: FloatVec([0.8414709848078965])
|
||||
))
|
||||
),
|
||||
TestCase(
|
||||
@@ -722,7 +722,7 @@ from greptime import *
|
||||
sin(num)"#,
|
||||
expect: Ok((
|
||||
ty: Float64,
|
||||
value: Float(0.8414709848078965)
|
||||
value: FloatVec([0.8414709848078965])
|
||||
))
|
||||
),
|
||||
TestCase(
|
||||
@@ -732,7 +732,7 @@ from greptime import *
|
||||
sin(True)"#,
|
||||
expect: Ok((
|
||||
ty: Float64,
|
||||
value: Float(0.8414709848078965)
|
||||
value: FloatVec([0.8414709848078965])
|
||||
))
|
||||
),
|
||||
TestCase(
|
||||
@@ -747,7 +747,7 @@ from greptime import *
|
||||
sin(num)"#,
|
||||
expect: Ok((
|
||||
ty: Float64,
|
||||
value: Float(0.0)
|
||||
value: FloatVec([0.0])
|
||||
))
|
||||
),
|
||||
// test if string returns error correctly
|
||||
|
||||
@@ -14,12 +14,13 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::ArrayRef;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::ColumnarValue as DFColValue;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
BooleanVector, Float64Vector, Helper, Int64Vector, NullVector, StringVector, VectorRef,
|
||||
BooleanVector, Float64Vector, Helper, Int64Vector, StringVector, VectorRef,
|
||||
};
|
||||
use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr};
|
||||
use rustpython_vm::object::PyObjectPayload;
|
||||
@@ -134,15 +135,9 @@ pub fn py_obj_to_vec(
|
||||
try_into_columnar_value(obj.clone(), vm).map_err(|e| format_py_error(e, vm))?;
|
||||
|
||||
match columnar_value {
|
||||
DFColValue::Scalar(ScalarValue::List(scalars, _datatype)) => match scalars {
|
||||
Some(scalars) => {
|
||||
let array =
|
||||
ScalarValue::iter_to_array(scalars).context(error::DataFusionSnafu)?;
|
||||
|
||||
Helper::try_into_vector(array).context(error::TypeCastSnafu)
|
||||
}
|
||||
None => Ok(Arc::new(NullVector::new(0))),
|
||||
},
|
||||
DFColValue::Scalar(ScalarValue::List(array)) => {
|
||||
Helper::try_into_vector(array as ArrayRef).context(error::TypeCastSnafu)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user