fix: Fix compiler errors in script crate (#749)

* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* feat: Add column_by_name to RecordBatch

* feat: modify select_from_rb

* feat: Fix some compiler errors in vector.rs

* feat: Fix more compiler errors in vector.rs

* fix: fix table.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix compiler errors in coprocessor

* fix: Fix some compiler errors

* fix: Fix compiler errors in script

* chore: Remove unused imports and format code

* test: disable interval tests

* test: Fix test_compile_execute test

* style: Fix clippy

* feat: Support interval

* feat: Add RecordBatch::columns and fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Yingwen
2022-12-15 14:20:35 +08:00
committed by GitHub
parent ce6d1cb7d1
commit 142dee41d6
11 changed files with 484 additions and 592 deletions

View File

@@ -79,11 +79,21 @@ impl RecordBatch {
self.df_record_batch
}
#[inline]
pub fn columns(&self) -> &[VectorRef] {
&self.columns
}
#[inline]
pub fn column(&self, idx: usize) -> &VectorRef {
&self.columns[idx]
}
pub fn column_by_name(&self, name: &str) -> Option<&VectorRef> {
let idx = self.schema.column_index_by_name(name)?;
Some(&self.columns[idx])
}
#[inline]
pub fn num_columns(&self) -> usize {
self.columns.len()
@@ -179,18 +189,23 @@ mod tests {
]));
let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
let v = Arc::new(UInt32Vector::from_slice(&[1, 2, 3]));
let columns: Vec<VectorRef> = vec![v.clone(), v.clone()];
let c1 = Arc::new(UInt32Vector::from_slice(&[1, 2, 3]));
let c2 = Arc::new(UInt32Vector::from_slice(&[4, 5, 6]));
let columns: Vec<VectorRef> = vec![c1, c2];
let batch = RecordBatch::new(schema.clone(), columns).unwrap();
let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
assert_eq!(3, batch.num_rows());
for i in 0..batch.num_columns() {
assert_eq!(&columns, batch.columns());
for (i, expect) in columns.iter().enumerate().take(batch.num_columns()) {
let column = batch.column(i);
let actual = column.as_any().downcast_ref::<UInt32Vector>().unwrap();
assert_eq!(&*v, actual);
assert_eq!(expect, column);
}
assert_eq!(schema, batch.schema);
assert_eq!(columns[0], *batch.column_by_name("c1").unwrap());
assert_eq!(columns[1], *batch.column_by_name("c2").unwrap());
assert!(batch.column_by_name("c3").is_none());
let converted =
RecordBatch::try_from_df_record_batch(schema, batch.df_record_batch().clone()).unwrap();
assert_eq!(batch, converted);

View File

@@ -8,6 +8,7 @@ license = "Apache-2.0"
default = ["python"]
python = [
"dep:datafusion",
"dep:datafusion-common",
"dep:datafusion-expr",
"dep:datafusion-physical-expr",
"dep:rustpython-vm",

View File

@@ -21,10 +21,9 @@ mod test;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::ColumnarValue as DFColValue;
use datafusion_physical_expr::AggregateExpr;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::compute::cast::CastOptions;
use datatypes::arrow::datatypes::DataType;
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::vectors::Helper as HelperVec;
use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr};
use rustpython_vm::{pymodule, AsObject, PyObjectRef, PyPayload, PyResult, VirtualMachine};
@@ -37,7 +36,7 @@ fn type_cast_error(name: &str, ty: &str, vm: &VirtualMachine) -> PyBaseException
vm.new_type_error(format!("Can't cast operand of type `{name}` into `{ty}`."))
}
fn collect_diff_types_string(values: &[ScalarValue], ty: &DataType) -> String {
fn collect_diff_types_string(values: &[ScalarValue], ty: &ArrowDataType) -> String {
values
.iter()
.enumerate()
@@ -56,6 +55,10 @@ fn collect_diff_types_string(values: &[ScalarValue], ty: &DataType) -> String {
.unwrap_or_else(|| "Nothing".to_string())
}
fn new_item_field(data_type: ArrowDataType) -> Field {
Field::new("item", data_type, false)
}
/// try to turn a Python Object into a PyVector or a scalar that can be use for calculate
///
/// supported scalar are(leftside is python data type, right side is rust type):
@@ -109,7 +112,7 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul
// TODO(dennis): empty list, we set type as null.
return Ok(DFColValue::Scalar(ScalarValue::List(
None,
Box::new(DataType::Null),
Box::new(new_item_field(ArrowDataType::Null)),
)));
}
@@ -121,8 +124,8 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul
)));
}
Ok(DFColValue::Scalar(ScalarValue::List(
Some(Box::new(ret)),
Box::new(ty),
Some(ret),
Box::new(new_item_field(ty)),
)))
} else {
Err(vm.new_type_error(format!(
@@ -184,22 +187,14 @@ fn scalar_val_try_into_py_obj(val: ScalarValue, vm: &VirtualMachine) -> PyResult
fn all_to_f64(col: DFColValue, vm: &VirtualMachine) -> PyResult<DFColValue> {
match col {
DFColValue::Array(arr) => {
let res = arrow::compute::cast::cast(
arr.as_ref(),
&DataType::Float64,
CastOptions {
wrapped: true,
partial: true,
},
)
.map_err(|err| {
let res = compute::cast(&arr, &ArrowDataType::Float64).map_err(|err| {
vm.new_type_error(format!(
"Arrow Type Cast Fail(from {:#?} to {:#?}): {err:#?}",
arr.data_type(),
DataType::Float64
ArrowDataType::Float64
))
})?;
Ok(DFColValue::Array(res.into()))
Ok(DFColValue::Array(res))
}
DFColValue::Scalar(val) => {
let val_in_f64 = match val {
@@ -210,7 +205,7 @@ fn all_to_f64(col: DFColValue, vm: &VirtualMachine) -> PyResult<DFColValue> {
return Err(vm.new_type_error(format!(
"Can't cast type {:#?} to {:#?}",
val.get_datatype(),
DataType::Float64
ArrowDataType::Float64
)))
}
};
@@ -284,17 +279,16 @@ pub(crate) mod greptime_builtin {
// P.S.: not extract to file because not-inlined proc macro attribute is *unstable*
use std::sync::Arc;
use arrow::compute::kernels::{aggregate, boolean, comparison};
use common_function::scalars::function::FunctionContext;
use common_function::scalars::math::PowFunction;
use common_function::scalars::{Function, FunctionRef, FUNCTION_REGISTRY};
use datafusion::arrow::compute::comparison::{gt_eq_scalar, lt_eq_scalar};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::scalar::{PrimitiveScalar, Scalar};
use datafusion::arrow::datatypes::DataType as ArrowDataType;
use datafusion::physical_plan::expressions;
use datafusion_expr::ColumnarValue as DFColValue;
use datafusion_physical_expr::math_expressions;
use datatypes::arrow::array::{ArrayRef, NullArray};
use datatypes::arrow::array::{ArrayRef, Int64Array, NullArray};
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::{self, compute};
use datatypes::vectors::{ConstantVector, Float64Vector, Helper, Int64Vector, VectorRef};
use paste::paste;
@@ -387,11 +381,6 @@ pub(crate) mod greptime_builtin {
eval_func("clip", &[v0, v1, v2], vm)
}
#[pyfunction]
fn median(v: PyVectorRef, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
eval_aggr_func("median", &[v], vm)
}
#[pyfunction]
fn diff(v: PyVectorRef, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
eval_aggr_func("diff", &[v], vm)
@@ -553,7 +542,7 @@ pub(crate) mod greptime_builtin {
fn random(len: usize, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
// This is in a proc macro so using full path to avoid strange things
// more info at: https://doc.rust-lang.org/reference/procedural-macros.html#procedural-macro-hygiene
let arg = NullArray::new(arrow::datatypes::DataType::Null, len);
let arg = NullArray::new(len);
let args = &[DFColValue::Array(std::sync::Arc::new(arg) as _)];
let res = math_expressions::random(args).map_err(|err| from_df_err(err, vm))?;
let ret = try_into_py_obj(res, vm)?;
@@ -572,6 +561,17 @@ pub(crate) mod greptime_builtin {
);
}
#[pyfunction]
fn median(values: PyVectorRef, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
bind_aggr_fn!(
Median,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
expr0
);
}
/// Not implement in datafusion
/// TODO(discord9): use greptime's own impl instead
/*
@@ -808,12 +808,16 @@ pub(crate) mod greptime_builtin {
Ok(res.into())
}
fn gen_none_array(data_type: DataType, len: usize, vm: &VirtualMachine) -> PyResult<ArrayRef> {
fn gen_none_array(
data_type: ArrowDataType,
len: usize,
vm: &VirtualMachine,
) -> PyResult<ArrayRef> {
macro_rules! match_none_array {
($VAR:ident, $LEN: ident, [$($TY:ident),*]) => {
paste!{
match $VAR{
$(DataType::$TY => Arc::new(arrow::array::[<$TY Array>]::from(vec![None;$LEN])), )*
$(ArrowDataType::$TY => Arc::new(arrow::array::[<$TY Array>]::from(vec![None;$LEN])), )*
_ => return Err(vm.new_type_error(format!("gen_none_array() does not support {:?}", data_type)))
}
}
@@ -829,10 +833,10 @@ pub(crate) mod greptime_builtin {
#[pyfunction]
fn prev(cur: PyVectorRef, vm: &VirtualMachine) -> PyResult<PyVector> {
let cur: ArrayRef = cur.to_arrow_array();
let cur = cur.to_arrow_array();
if cur.len() == 0 {
let ret = cur.slice(0, 0);
let ret = Helper::try_into_vector(&*ret).map_err(|e| {
let ret = Helper::try_into_vector(ret.clone()).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
ret, e
@@ -842,10 +846,10 @@ pub(crate) mod greptime_builtin {
}
let cur = cur.slice(0, cur.len() - 1); // except the last one that is
let fill = gen_none_array(cur.data_type().to_owned(), 1, vm)?;
let ret = compute::concatenate::concatenate(&[&*fill, &*cur]).map_err(|err| {
let ret = compute::concat(&[&*fill, &*cur]).map_err(|err| {
vm.new_runtime_error(format!("Can't concat array[0] with array[0:-1]!{err:#?}"))
})?;
let ret = Helper::try_into_vector(&*ret).map_err(|e| {
let ret = Helper::try_into_vector(ret.clone()).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
ret, e
@@ -856,10 +860,10 @@ pub(crate) mod greptime_builtin {
#[pyfunction]
fn next(cur: PyVectorRef, vm: &VirtualMachine) -> PyResult<PyVector> {
let cur: ArrayRef = cur.to_arrow_array();
let cur = cur.to_arrow_array();
if cur.len() == 0 {
let ret = cur.slice(0, 0);
let ret = Helper::try_into_vector(&*ret).map_err(|e| {
let ret = Helper::try_into_vector(ret.clone()).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
ret, e
@@ -869,10 +873,10 @@ pub(crate) mod greptime_builtin {
}
let cur = cur.slice(1, cur.len() - 1); // except the last one that is
let fill = gen_none_array(cur.data_type().to_owned(), 1, vm)?;
let ret = compute::concatenate::concatenate(&[&*cur, &*fill]).map_err(|err| {
let ret = compute::concat(&[&*cur, &*fill]).map_err(|err| {
vm.new_runtime_error(format!("Can't concat array[0] with array[0:-1]!{err:#?}"))
})?;
let ret = Helper::try_into_vector(&*ret).map_err(|e| {
let ret = Helper::try_into_vector(ret.clone()).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
ret, e
@@ -881,55 +885,24 @@ pub(crate) mod greptime_builtin {
Ok(ret.into())
}
fn try_scalar_to_value(scalar: &dyn Scalar, vm: &VirtualMachine) -> PyResult<i64> {
let ty_error = |s: String| vm.new_type_error(s);
scalar
.as_any()
.downcast_ref::<PrimitiveScalar<i64>>()
.ok_or_else(|| {
ty_error(format!(
"expect scalar to be i64, found{:?}",
scalar.data_type()
))
})?
.value()
.ok_or_else(|| ty_error("All element is Null in a time series array".to_string()))
}
/// generate interval time point
fn gen_inteveral(
oldest: &dyn Scalar,
newest: &dyn Scalar,
oldest: i64,
newest: i64,
duration: i64,
vm: &VirtualMachine,
) -> PyResult<Vec<PrimitiveScalar<i64>>> {
use datatypes::arrow::datatypes::DataType;
match (oldest.data_type(), newest.data_type()) {
(DataType::Int64, DataType::Int64) => (),
_ => {
return Err(vm.new_type_error(format!(
"Expect int64, found {:?} and {:?}",
oldest.data_type(),
newest.data_type()
)));
}
}
let oldest = try_scalar_to_value(oldest, vm)?;
let newest = try_scalar_to_value(newest, vm)?;
) -> PyResult<Vec<i64>> {
if oldest > newest {
return Err(vm.new_value_error(format!("{oldest} is greater than {newest}")));
}
let ret = if duration > 0 {
(oldest..=newest)
if duration > 0 {
let ret = (oldest..=newest)
.step_by(duration as usize)
.map(|v| PrimitiveScalar::new(DataType::Int64, Some(v)))
.collect::<Vec<_>>()
.collect::<Vec<_>>();
Ok(ret)
} else {
return Err(vm.new_value_error(format!("duration: {duration} is not positive number.")));
};
Ok(ret)
Err(vm.new_value_error(format!("duration: {duration} is not positive number.")))
}
}
/// `func`: exec on sliding window slice of given `arr`, expect it to always return a PyVector of one element
@@ -952,12 +925,19 @@ pub(crate) mod greptime_builtin {
let arrow_error = |err: ArrowError| vm.new_runtime_error(format!("Arrow Error: {err:#?}"));
let datatype_error =
|err: datatypes::Error| vm.new_runtime_error(format!("DataType Errors!: {err:#?}"));
let ts: ArrayRef = ts.to_arrow_array();
let arr: ArrayRef = arr.to_arrow_array();
let ts_array_ref: ArrayRef = ts.to_arrow_array();
let ts = ts_array_ref
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| {
vm.new_type_error(format!("ts must be int64, found: {:?}", ts_array_ref))
})?;
let slices = {
let oldest = compute::aggregate::min(&*ts).map_err(arrow_error)?;
let newest = compute::aggregate::max(&*ts).map_err(arrow_error)?;
gen_inteveral(&*oldest, &*newest, duration, vm)?
let oldest = aggregate::min(ts)
.ok_or_else(|| vm.new_runtime_error("ts must has min value".to_string()))?;
let newest = aggregate::max(ts)
.ok_or_else(|| vm.new_runtime_error("ts must has max value".to_string()))?;
gen_inteveral(oldest, newest, duration, vm)?
};
let windows = {
@@ -969,11 +949,15 @@ pub(crate) mod greptime_builtin {
it
})
.map(|(first, second)| {
compute::boolean::and(&gt_eq_scalar(&*ts, first), &lt_eq_scalar(&*ts, second))
.map_err(arrow_error)
let left = comparison::gt_eq_scalar(ts, *first).map_err(arrow_error)?;
let right = comparison::lt_eq_scalar(ts, *second).map_err(arrow_error)?;
boolean::and(&left, &right).map_err(arrow_error)
})
.map(|mask| match mask {
Ok(mask) => compute::filter::filter(&*arr, &mask).map_err(arrow_error),
Ok(mask) => {
let arrow_arr = arr.to_arrow_array();
compute::filter(&arrow_arr, &mask).map_err(arrow_error)
}
Err(e) => Err(e),
})
.collect::<Result<Vec<_>, _>>()?
@@ -1013,16 +997,17 @@ pub(crate) mod greptime_builtin {
.map(apply_interval_function)
.collect::<Result<Vec<_>, _>>()?;
// 3. get returen vector and concat them
let ret = fn_results
.into_iter()
.try_reduce(|acc, x| {
compute::concatenate::concatenate(&[acc.as_ref(), x.as_ref()]).map(Arc::from)
})
.map_err(arrow_error)?
.unwrap_or_else(|| Arc::from(arr.slice(0, 0)));
// 3. get returned vector and concat them
let result_arrays: Vec<_> = fn_results
.iter()
.map(|vector| vector.to_arrow_array())
.collect();
let result_dyn_arrays: Vec<_> = result_arrays.iter().map(|v| v.as_ref()).collect();
let concat_array = compute::concat(&result_dyn_arrays).map_err(arrow_error)?;
let vector = Helper::try_into_vector(concat_array).map_err(datatype_error)?;
// 4. return result vector
Ok(Helper::try_into_vector(ret).map_err(datatype_error)?.into())
Ok(PyVector::from(vector))
}
/// return first element in a `PyVector` in sliced new `PyVector`, if vector's length is zero, return a zero sized slice instead
@@ -1033,7 +1018,7 @@ pub(crate) mod greptime_builtin {
0 => arr.slice(0, 0),
_ => arr.slice(0, 1),
};
let ret = Helper::try_into_vector(&*ret).map_err(|e| {
let ret = Helper::try_into_vector(ret.clone()).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
ret, e
@@ -1050,7 +1035,7 @@ pub(crate) mod greptime_builtin {
0 => arr.slice(0, 0),
_ => arr.slice(arr.len() - 1, 1),
};
let ret = Helper::try_into_vector(&*ret).map_err(|e| {
let ret = Helper::try_into_vector(ret.clone()).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
ret, e

View File

@@ -18,10 +18,10 @@ use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use datatypes::arrow::array::{Float64Array, Int64Array, PrimitiveArray};
use datatypes::arrow::compute::cast::CastOptions;
use datatypes::arrow::datatypes::DataType;
use datatypes::vectors::VectorRef;
use datatypes::arrow::array::{Float64Array, Int64Array};
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::vectors::{Float64Vector, Int64Vector, VectorRef};
use ron::from_str as from_ron_string;
use rustpython_vm::builtins::{PyFloat, PyInt, PyList};
use rustpython_vm::class::PyClassImpl;
@@ -68,17 +68,17 @@ fn convert_scalar_to_py_obj_and_back() {
panic!("Convert errors, expect 1")
}
let col = DFColValue::Scalar(ScalarValue::List(
Some(Box::new(vec![
Some(vec![
ScalarValue::Int64(Some(1)),
ScalarValue::Int64(Some(2)),
])),
Box::new(DataType::Int64),
]),
Box::new(Field::new("item", ArrowDataType::Int64, false)),
));
let to = try_into_py_obj(col, vm).unwrap();
let back = try_into_columnar_value(to, vm).unwrap();
if let DFColValue::Scalar(ScalarValue::List(Some(list), ty)) = back {
if let DFColValue::Scalar(ScalarValue::List(Some(list), field)) = back {
assert_eq!(list.len(), 2);
assert_eq!(ty.as_ref(), &DataType::Int64);
assert_eq!(*field.data_type(), ArrowDataType::Int64);
}
let list: Vec<PyObjectRef> = vec![vm.ctx.new_int(1).into(), vm.ctx.new_int(2).into()];
let nested_list: Vec<PyObjectRef> =
@@ -92,12 +92,10 @@ fn convert_scalar_to_py_obj_and_back() {
));
}
let list: PyVector = PyVector::from(
HelperVec::try_into_vector(
Arc::new(PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4])) as ArrayRef,
)
.unwrap(),
);
let list: PyVector =
PyVector::from(
Arc::new(Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4])) as VectorRef
);
let nested_list: Vec<PyObjectRef> = vec![list.into_pyobject(vm), vm.ctx.new_int(3).into()];
let list_obj = vm.ctx.new_list(nested_list).into();
let expect_err = try_into_columnar_value(list_obj, vm);
@@ -115,7 +113,7 @@ struct TestCase {
#[derive(Debug, Serialize, Deserialize)]
struct Var {
value: PyValue,
ty: DataType,
ty: ArrowDataType,
}
/// for floating number comparison
@@ -189,25 +187,25 @@ impl PyValue {
}
}
fn is_float(ty: &DataType) -> bool {
fn is_float(ty: &ArrowDataType) -> bool {
matches!(
ty,
DataType::Float16 | DataType::Float32 | DataType::Float64
ArrowDataType::Float16 | ArrowDataType::Float32 | ArrowDataType::Float64
)
}
/// unsigned included
fn is_int(ty: &DataType) -> bool {
fn is_int(ty: &ArrowDataType) -> bool {
matches!(
ty,
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
ArrowDataType::UInt8
| ArrowDataType::UInt16
| ArrowDataType::UInt32
| ArrowDataType::UInt64
| ArrowDataType::Int8
| ArrowDataType::Int16
| ArrowDataType::Int32
| ArrowDataType::Int64
)
}
@@ -217,7 +215,7 @@ impl PyValue {
PyValue::FloatVec(v) => {
Arc::new(datatypes::vectors::Float64Vector::from_vec(v.clone()))
}
PyValue::IntVec(v) => Arc::new(datatypes::vectors::Int64Vector::from_vec(v.clone())),
PyValue::IntVec(v) => Arc::new(Int64Vector::from_vec(v.clone())),
PyValue::Int(v) => return Ok(vm.ctx.new_int(*v).into()),
PyValue::Float(v) => return Ok(vm.ctx.new_float(*v).into()),
Self::Bool(v) => return Ok(vm.ctx.new_bool(*v).into()),
@@ -234,16 +232,9 @@ impl PyValue {
let res = res.to_arrow_array();
let ty = res.data_type();
if is_float(ty) {
let vec_f64 = arrow::compute::cast::cast(
res.as_ref(),
&DataType::Float64,
CastOptions {
wrapped: true,
partial: true,
},
)
.map_err(|err| format!("{err:#?}"))?;
assert_eq!(vec_f64.data_type(), &DataType::Float64);
let vec_f64 = compute::cast(&res, &ArrowDataType::Float64)
.map_err(|err| format!("{err:#?}"))?;
assert_eq!(vec_f64.data_type(), &ArrowDataType::Float64);
let vec_f64 = vec_f64
.as_any()
.downcast_ref::<Float64Array>()
@@ -251,13 +242,6 @@ impl PyValue {
let ret = vec_f64
.into_iter()
.map(|v| v.map(|inner| inner.to_owned()))
/* .enumerate()
.map(|(idx, v)| {
v.ok_or(format!(
"No null element expected, found one in {idx} position"
))
.map(|v| v.to_owned())
})*/
.collect::<Vec<_>>();
if ret.iter().all(|x| x.is_some()) {
Ok(Self::FloatVec(
@@ -267,16 +251,9 @@ impl PyValue {
Ok(Self::FloatVecWithNull(ret))
}
} else if is_int(ty) {
let vec_int = arrow::compute::cast::cast(
res.as_ref(),
&DataType::Int64,
CastOptions {
wrapped: true,
partial: true,
},
)
.map_err(|err| format!("{err:#?}"))?;
assert_eq!(vec_int.data_type(), &DataType::Int64);
let vec_int = compute::cast(&res, &ArrowDataType::Int64)
.map_err(|err| format!("{err:#?}"))?;
assert_eq!(vec_int.data_type(), &ArrowDataType::Int64);
let vec_i64 = vec_int
.as_any()
.downcast_ref::<Int64Array>()
@@ -293,7 +270,7 @@ impl PyValue {
.collect::<Result<_, String>>()?;
Ok(Self::IntVec(ret))
} else {
Err(format!("unspupported DataType:{ty:#?}"))
Err(format!("unspupported ArrowDataType:{ty:#?}"))
}
} else if is_instance::<PyInt>(obj, vm) {
let res = obj

View File

@@ -16,19 +16,18 @@ pub mod compile;
pub mod parse;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::result::Result as StdResult;
use std::sync::Arc;
use common_recordbatch::RecordBatch;
use common_telemetry::info;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow;
use datatypes::arrow::array::{Array, ArrayRef};
use datatypes::arrow::compute::cast::CastOptions;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::schema::Schema;
use datatypes::vectors::{BooleanVector, Helper, StringVector, Vector, VectorRef};
use datatypes::arrow::array::Array;
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{Helper, VectorRef};
use rustpython_compiler_core::CodeObject;
use rustpython_vm as vm;
use rustpython_vm::class::PyClassImpl;
@@ -43,7 +42,8 @@ use vm::{Interpreter, PyObjectRef, VirtualMachine};
use crate::python::builtins::greptime_builtin;
use crate::python::coprocessor::parse::DecoratorArgs;
use crate::python::error::{
ensure, ret_other_error_with, ArrowSnafu, OtherSnafu, Result, TypeCastSnafu,
ensure, ret_other_error_with, ArrowSnafu, NewRecordBatchSnafu, OtherSnafu, Result,
TypeCastSnafu,
};
use crate::python::utils::{format_py_error, is_instance, py_vec_obj_to_array};
use crate::python::PyVector;
@@ -54,7 +54,8 @@ thread_local!(static INTERPRETER: RefCell<Option<Arc<Interpreter>>> = RefCell::n
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AnnotationInfo {
/// if None, use types inferred by PyVector
pub datatype: Option<DataType>,
// TODO(yingwen): We should use our data type. i.e. ConcreteDataType.
pub datatype: Option<ArrowDataType>,
pub is_nullable: bool,
}
@@ -95,7 +96,7 @@ impl Coprocessor {
/// generate [`Schema`] according to return names, types,
/// if no annotation
/// the datatypes of the actual columns is used directly
fn gen_schema(&self, cols: &[ArrayRef]) -> Result<Arc<ArrowSchema>> {
fn gen_schema(&self, cols: &[VectorRef]) -> Result<SchemaRef> {
let names = &self.deco_args.ret_names;
let anno = &self.return_types;
ensure!(
@@ -109,35 +110,38 @@ impl Coprocessor {
)
}
);
Ok(Arc::new(ArrowSchema::from(
names
.iter()
.enumerate()
.map(|(idx, name)| {
let real_ty = cols[idx].data_type().to_owned();
let AnnotationInfo {
datatype: ty,
is_nullable,
} = anno[idx].to_owned().unwrap_or_else(|| {
// default to be not nullable and use DataType inferred by PyVector itself
AnnotationInfo {
datatype: Some(real_ty.to_owned()),
is_nullable: false,
}
});
Field::new(
name,
// if type is like `_` or `_ | None`
ty.unwrap_or(real_ty),
is_nullable,
)
})
.collect::<Vec<Field>>(),
)))
let column_schemas = names
.iter()
.enumerate()
.map(|(idx, name)| {
let real_ty = cols[idx].data_type();
let AnnotationInfo {
datatype: ty,
is_nullable,
} = anno[idx].to_owned().unwrap_or_else(|| {
// default to be not nullable and use DataType inferred by PyVector itself
AnnotationInfo {
datatype: Some(real_ty.as_arrow_type()),
is_nullable: false,
}
});
let column_type = match ty {
Some(arrow_type) => {
ConcreteDataType::try_from(&arrow_type).context(TypeCastSnafu)?
}
// if type is like `_` or `_ | None`
None => real_ty,
};
Ok(ColumnSchema::new(name, column_type, is_nullable))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(Schema::new(column_schemas)))
}
/// check if real types and annotation types(if have) is the same, if not try cast columns to annotated type
fn check_and_cast_type(&self, cols: &mut [ArrayRef]) -> Result<()> {
fn check_and_cast_type(&self, cols: &mut [VectorRef]) -> Result<()> {
let return_types = &self.return_types;
// allow ignore Return Type Annotation
if return_types.is_empty() {
@@ -161,21 +165,10 @@ impl Coprocessor {
{
let real_ty = col.data_type();
let anno_ty = datatype;
if real_ty != anno_ty {
{
// This`CastOption` allow for overflowly cast and int to float loosely cast etc..,
// check its doc for more information
*col = arrow::compute::cast::cast(
col.as_ref(),
anno_ty,
CastOptions {
wrapped: true,
partial: true,
},
)
.context(ArrowSnafu)?
.into();
}
if real_ty.as_arrow_type() != *anno_ty {
let array = col.to_arrow_array();
let array = compute::cast(&array, anno_ty).context(ArrowSnafu)?;
*col = Helper::try_into_vector(array).context(TypeCastSnafu)?;
}
}
}
@@ -183,47 +176,6 @@ impl Coprocessor {
}
}
/// cast a `dyn Array` of type unsigned/int/float into a `dyn Vector`
fn try_into_vector<T: datatypes::types::Primitive>(arg: Arc<dyn Array>) -> Result<Arc<dyn Vector>> {
// wrap try_into_vector in here to convert `datatypes::error::Error` to `python::error::Error`
Helper::try_into_vector(arg).context(TypeCastSnafu)
}
/// convert a `Vec<ArrayRef>` into a `Vec<PyVector>` only when they are of supported types
/// PyVector now only support unsigned&int8/16/32/64, float32/64 and bool when doing meanful arithmetics operation
fn try_into_py_vector(fetch_args: Vec<ArrayRef>) -> Result<Vec<PyVector>> {
let mut args: Vec<PyVector> = Vec::with_capacity(fetch_args.len());
for (idx, arg) in fetch_args.into_iter().enumerate() {
let v: VectorRef = match arg.data_type() {
DataType::Float32 => try_into_vector::<f32>(arg)?,
DataType::Float64 => try_into_vector::<f64>(arg)?,
DataType::UInt8 => try_into_vector::<u8>(arg)?,
DataType::UInt16 => try_into_vector::<u16>(arg)?,
DataType::UInt32 => try_into_vector::<u32>(arg)?,
DataType::UInt64 => try_into_vector::<u64>(arg)?,
DataType::Int8 => try_into_vector::<i8>(arg)?,
DataType::Int16 => try_into_vector::<i16>(arg)?,
DataType::Int32 => try_into_vector::<i32>(arg)?,
DataType::Int64 => try_into_vector::<i64>(arg)?,
DataType::Utf8 => {
Arc::new(StringVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _
}
DataType::Boolean => {
Arc::new(BooleanVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _
}
_ => {
return ret_other_error_with(format!(
"Unsupported data type at column {idx}: {:?} for coprocessor",
arg.data_type()
))
.fail()
}
};
args.push(PyVector::from(v));
}
Ok(args)
}
/// convert a tuple of `PyVector` or one `PyVector`(wrapped in a Python Object Ref[`PyObjectRef`])
/// to a `Vec<ArrayRef>`
/// by default, a constant(int/float/bool) gives the a constant array of same length with input args
@@ -231,7 +183,7 @@ fn try_into_columns(
obj: &PyObjectRef,
vm: &VirtualMachine,
col_len: usize,
) -> Result<Vec<ArrayRef>> {
) -> Result<Vec<VectorRef>> {
if is_instance::<PyTuple>(obj, vm) {
let tuple = obj.payload::<PyTuple>().with_context(|| {
ret_other_error_with(format!("can't cast obj {:?} to PyTuple)", obj))
@@ -239,7 +191,7 @@ fn try_into_columns(
let cols = tuple
.iter()
.map(|obj| py_vec_obj_to_array(obj, vm, col_len))
.collect::<Result<Vec<ArrayRef>>>()?;
.collect::<Result<Vec<VectorRef>>>()?;
Ok(cols)
} else {
let col = py_vec_obj_to_array(obj, vm, col_len)?;
@@ -249,27 +201,16 @@ fn try_into_columns(
/// select columns according to `fetch_names` from `rb`
/// and cast them into a Vec of PyVector
fn select_from_rb(rb: &DfRecordBatch, fetch_names: &[String]) -> Result<Vec<PyVector>> {
let field_map: HashMap<&String, usize> = rb
.schema()
.fields
fn select_from_rb(rb: &RecordBatch, fetch_names: &[String]) -> Result<Vec<PyVector>> {
fetch_names
.iter()
.enumerate()
.map(|(idx, field)| (&field.name, idx))
.collect();
let fetch_idx: Vec<usize> = fetch_names
.iter()
.map(|field| {
field_map.get(field).copied().context(OtherSnafu {
reason: format!("Can't found field name {field}"),
})
.map(|name| {
let vector = rb.column_by_name(name).with_context(|| OtherSnafu {
reason: format!("Can't find field name {}", name),
})?;
Ok(PyVector::from(vector.clone()))
})
.collect::<Result<Vec<usize>>>()?;
let fetch_args: Vec<Arc<dyn Array>> = fetch_idx
.into_iter()
.map(|idx| rb.column(idx).clone())
.collect();
try_into_py_vector(fetch_args)
.collect()
}
/// match between arguments' real type and annotation types
@@ -277,12 +218,12 @@ fn select_from_rb(rb: &DfRecordBatch, fetch_names: &[String]) -> Result<Vec<PyVe
fn check_args_anno_real_type(
args: &[PyVector],
copr: &Coprocessor,
rb: &DfRecordBatch,
rb: &RecordBatch,
) -> Result<()> {
for (idx, arg) in args.iter().enumerate() {
let anno_ty = copr.arg_types[idx].to_owned();
let real_ty = arg.to_arrow_array().data_type().to_owned();
let is_nullable: bool = rb.schema().fields[idx].is_nullable;
let is_nullable: bool = rb.schema.column_schemas()[idx].is_nullable();
ensure!(
anno_ty
.to_owned()
@@ -323,31 +264,32 @@ fn set_items_in_scope(
/// The coprocessor function accept a python script and a Record Batch:
/// ## What it does
/// 1. it take a python script and a [`DfRecordBatch`], extract columns and annotation info according to `args` given in decorator in python script
/// 1. it take a python script and a [`RecordBatch`], extract columns and annotation info according to `args` given in decorator in python script
/// 2. execute python code and return a vector or a tuple of vector,
/// 3. the returning vector(s) is assembled into a new [`DfRecordBatch`] according to `returns` in python decorator and return to caller
/// 3. the returning vector(s) is assembled into a new [`RecordBatch`] according to `returns` in python decorator and return to caller
///
/// # Example
///
/// ```ignore
/// use std::sync::Arc;
/// use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
/// use datatypes::arrow::array::PrimitiveArray;
/// use datatypes::arrow::datatypes::{DataType, Field, Schema};
/// use common_recordbatch::RecordBatch;
/// use datatypes::prelude::*;
/// use datatypes::schema::{ColumnSchema, Schema};
/// use datatypes::vectors::{Float32Vector, Float64Vector};
/// use common_function::scalars::python::exec_coprocessor;
/// let python_source = r#"
/// @copr(args=["cpu", "mem"], returns=["perf", "what"])
/// def a(cpu, mem):
/// return cpu + mem, cpu - mem
/// "#;
/// let cpu_array = PrimitiveArray::from_slice([0.9f32, 0.8, 0.7, 0.6]);
/// let mem_array = PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4]);
/// let schema = Arc::new(Schema::from(vec![
/// Field::new("cpu", DataType::Float32, false),
/// Field::new("mem", DataType::Float64, false),
/// let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.6]);
/// let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]);
/// let schema = Arc::new(Schema::new(vec![
/// ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), false),
/// ColumnSchema::new("mem", ConcreteDataType::float64_datatype(), false),
/// ]));
/// let rb =
/// DfRecordBatch::try_new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap();
/// RecordBatch::new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap();
/// let ret = exec_coprocessor(python_source, &rb).unwrap();
/// assert_eq!(ret.column(0).len(), 4);
/// ```
@@ -357,7 +299,7 @@ fn set_items_in_scope(
///
/// Currently support types are `u8`, `u16`, `u32`, `u64`, `i8`, `i16`, `i32`, `i64` and `f16`, `f32`, `f64`
///
/// use `f64 | None` to mark if returning column is nullable like in [`DfRecordBatch`]'s schema's [`Field`]'s is_nullable
/// use `f64 | None` to mark if returning column is nullable like in [`RecordBatch`]'s schema's [`ColumnSchema`]'s is_nullable
///
/// you can also use single underscore `_` to let coprocessor infer what type it is, so `_` and `_ | None` are both valid in type annotation.
/// Note: using `_` means not nullable column, using `_ | None` means nullable column
@@ -373,7 +315,7 @@ fn set_items_in_scope(
/// You can return constant in python code like `return 1, 1.0, True`
/// which create a constant array(with same value)(currently support int, float and bool) as column on return
#[cfg(test)]
pub fn exec_coprocessor(script: &str, rb: &DfRecordBatch) -> Result<RecordBatch> {
pub fn exec_coprocessor(script: &str, rb: &RecordBatch) -> Result<RecordBatch> {
// 1. parse the script and check if it's only a function with `@coprocessor` decorator, and get `args` and `returns`,
// 2. also check for exist of `args` in `rb`, if not found, return error
// TODO(discord9): cache the result of parse_copr
@@ -383,7 +325,7 @@ pub fn exec_coprocessor(script: &str, rb: &DfRecordBatch) -> Result<RecordBatch>
pub(crate) fn exec_with_cached_vm(
copr: &Coprocessor,
rb: &DfRecordBatch,
rb: &RecordBatch,
args: Vec<PyVector>,
vm: &Arc<Interpreter>,
) -> Result<RecordBatch> {
@@ -401,7 +343,7 @@ pub(crate) fn exec_with_cached_vm(
// 5. get returns as either a PyVector or a PyTuple, and naming schema them according to `returns`
let col_len = rb.num_rows();
let mut cols: Vec<ArrayRef> = try_into_columns(&ret, vm, col_len)?;
let mut cols = try_into_columns(&ret, vm, col_len)?;
ensure!(
cols.len() == copr.deco_args.ret_names.len(),
OtherSnafu {
@@ -417,11 +359,7 @@ pub(crate) fn exec_with_cached_vm(
copr.check_and_cast_type(&mut cols)?;
// 6. return a assembled DfRecordBatch
let schema = copr.gen_schema(&cols)?;
let res_rb = DfRecordBatch::try_new(schema.clone(), cols).context(ArrowSnafu)?;
Ok(RecordBatch {
schema: Arc::new(Schema::try_from(schema).context(TypeCastSnafu)?),
df_recordbatch: res_rb,
})
RecordBatch::new(schema, cols).context(NewRecordBatchSnafu)
})
}
@@ -459,7 +397,7 @@ pub(crate) fn init_interpreter() -> Arc<Interpreter> {
}
/// using a parsed `Coprocessor` struct as input to execute python code
pub(crate) fn exec_parsed(copr: &Coprocessor, rb: &DfRecordBatch) -> Result<RecordBatch> {
pub(crate) fn exec_parsed(copr: &Coprocessor, rb: &RecordBatch) -> Result<RecordBatch> {
// 3. get args from `rb`, and cast them into PyVector
let args: Vec<PyVector> = select_from_rb(rb, &copr.deco_args.arg_names)?;
check_args_anno_real_type(&args, copr, rb)?;
@@ -477,7 +415,7 @@ pub(crate) fn exec_parsed(copr: &Coprocessor, rb: &DfRecordBatch) -> Result<Reco
#[allow(dead_code)]
pub fn exec_copr_print(
script: &str,
rb: &DfRecordBatch,
rb: &RecordBatch,
ln_offset: usize,
filename: &str,
) -> StdResult<RecordBatch, String> {

View File

@@ -59,7 +59,7 @@ impl Stream for CoprStream {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(recordbatch))) => {
let batch = exec_parsed(&self.copr, &recordbatch.df_recordbatch)
let batch = exec_parsed(&self.copr, &recordbatch)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -149,8 +149,8 @@ mod tests {
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::util;
use datafusion_common::field_util::{FieldExt, SchemaExt};
use datatypes::arrow::array::{Float64Array, Int64Array};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{Float64Vector, Int64Vector};
use query::QueryEngineFactory;
use table::table::numbers::NumbersTable;
@@ -177,6 +177,7 @@ mod tests {
let script_engine = PyEngine::new(query_engine.clone());
// To avoid divide by zero, the script divides `add(a, b)` by `g.sqrt(c + 1)` instead of `g.sqrt(c)`
let script = r#"
import greptime as g
def add(a, b):
@@ -184,7 +185,7 @@ def add(a, b):
@copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100")
def test(a, b, c):
return add(a, b) / g.sqrt(c)
return add(a, b) / g.sqrt(c + 1)
"#;
let script = script_engine
.compile(script, CompileContext::default())
@@ -197,15 +198,18 @@ def test(a, b, c):
assert_eq!(1, numbers.len());
let number = &numbers[0];
assert_eq!(number.df_recordbatch.num_columns(), 1);
assert_eq!("r", number.schema.arrow_schema().field(0).name());
assert_eq!(number.num_columns(), 1);
assert_eq!("r", number.schema.column_schemas()[0].name);
let columns = number.df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(100, columns[0].len());
let rows = columns[0].as_any().downcast_ref::<Float64Array>().unwrap();
assert!(rows.value(0).is_nan());
assert_eq!((99f64 + 99f64) / 99f64.sqrt(), rows.value(99))
assert_eq!(1, number.num_columns());
assert_eq!(100, number.column(0).len());
let rows = number
.column(0)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap();
assert_eq!(0f64, rows.get_data(0).unwrap());
assert_eq!((99f64 + 99f64) / 100f64.sqrt(), rows.get_data(99).unwrap())
}
_ => unreachable!(),
}
@@ -229,15 +233,18 @@ def test(a):
assert_eq!(1, numbers.len());
let number = &numbers[0];
assert_eq!(number.df_recordbatch.num_columns(), 1);
assert_eq!("r", number.schema.arrow_schema().field(0).name());
assert_eq!(number.num_columns(), 1);
assert_eq!("r", number.schema.column_schemas()[0].name);
let columns = number.df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(50, columns[0].len());
let rows = columns[0].as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(0, rows.value(0));
assert_eq!(98, rows.value(49))
assert_eq!(1, number.num_columns());
assert_eq!(50, number.column(0).len());
let rows = number
.column(0)
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
assert_eq!(0, rows.get_data(0).unwrap());
assert_eq!(98, rows.get_data(49).unwrap())
}
_ => unreachable!(),
}

View File

@@ -105,6 +105,12 @@ pub enum Error {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to create record batch, source: {}", source))]
NewRecordBatch {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
}
impl From<QueryError> for Error {
@@ -121,7 +127,9 @@ impl ErrorExt for Error {
| Error::PyRuntime { .. }
| Error::Other { .. } => StatusCode::Internal,
Error::RecordBatch { source } => source.status_code(),
Error::RecordBatch { source } | Error::NewRecordBatch { source } => {
source.status_code()
}
Error::DatabaseQuery { source } => source.status_code(),
Error::TypeCast { source } => source.status_code(),

View File

@@ -20,10 +20,12 @@ use std::io::prelude::*;
use std::path::Path;
use std::sync::Arc;
use common_recordbatch::RecordBatch;
use console::style;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::PrimitiveArray;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Float32Vector, Float64Vector, Int64Vector, VectorRef};
use ron::from_str as from_ron_string;
use rustpython_parser::parser;
use serde::{Deserialize, Serialize};
@@ -62,19 +64,26 @@ enum Predicate {
#[derive(Serialize, Deserialize, Debug)]
struct ColumnInfo {
pub ty: DataType,
pub ty: ArrowDataType,
pub len: usize,
}
fn create_sample_recordbatch() -> DfRecordBatch {
let cpu_array = PrimitiveArray::from_slice([0.9f32, 0.8, 0.7, 0.6]);
let mem_array = PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4]);
let schema = Arc::new(Schema::from(vec![
Field::new("cpu", DataType::Float32, false),
Field::new("mem", DataType::Float64, false),
fn create_sample_recordbatch() -> RecordBatch {
let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.6]);
let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]);
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), false),
ColumnSchema::new("mem", ConcreteDataType::float64_datatype(), false),
]));
DfRecordBatch::try_new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap()
RecordBatch::new(
schema,
[
Arc::new(cpu_array) as VectorRef,
Arc::new(mem_array) as VectorRef,
],
)
.unwrap()
}
/// test cases which read from a .ron file, deser,
@@ -102,8 +111,7 @@ fn run_ron_testcases() {
Predicate::ParseIsErr { reason } => {
let copr = parse_and_compile_copr(&testcase.code);
if copr.is_ok() {
eprintln!("Expect to be err, found{copr:#?}");
panic!()
panic!("Expect to be err, found{copr:#?}");
}
let res = &copr.unwrap_err();
println!(
@@ -113,24 +121,18 @@ fn run_ron_testcases() {
let (res, _) = get_error_reason_loc(res);
if !res.contains(&reason) {
eprintln!("{}", testcase.code);
eprintln!("Parse Error, expect \"{reason}\" in \"{res}\", but not found.");
panic!()
panic!("Parse Error, expect \"{reason}\" in \"{res}\", but not found.");
}
}
Predicate::ExecIsOk { fields, columns } => {
let rb = create_sample_recordbatch();
let res = coprocessor::exec_coprocessor(&testcase.code, &rb);
if res.is_err() {
dbg!(&res);
}
assert!(res.is_ok());
let res = res.unwrap();
let res = coprocessor::exec_coprocessor(&testcase.code, &rb).unwrap();
fields
.iter()
.zip(&res.schema.arrow_schema().fields)
.zip(res.schema.column_schemas())
.map(|(anno, real)| {
if !(anno.datatype.clone().unwrap() == real.data_type
&& anno.is_nullable == real.is_nullable)
if !(anno.datatype.as_ref().unwrap() == &real.data_type.as_arrow_type()
&& anno.is_nullable == real.is_nullable())
{
eprintln!("fields expect to be {anno:#?}, found to be {real:#?}.");
panic!()
@@ -139,17 +141,18 @@ fn run_ron_testcases() {
.count();
columns
.iter()
.zip(res.df_recordbatch.columns())
.map(|(anno, real)| {
if !(&anno.ty == real.data_type() && anno.len == real.len()) {
eprintln!(
.enumerate()
.map(|(i, anno)| {
let real = res.column(i);
if !(anno.ty == real.data_type().as_arrow_type() && anno.len == real.len())
{
panic!(
"Unmatch type or length!Expect [{:#?}; {}], found [{:#?}; {}]",
anno.ty,
anno.len,
real.data_type(),
real.len()
);
panic!()
}
})
.count();
@@ -166,17 +169,15 @@ fn run_ron_testcases() {
);
let (reason, _) = get_error_reason_loc(&res);
if !reason.contains(&part_reason) {
eprintln!(
panic!(
"{}\nExecute error, expect \"{reason}\" in \"{res}\", but not found.",
testcase.code,
reason = style(reason).green(),
res = style(res).red()
);
panic!()
}
} else {
eprintln!("{:#?}\nExpect Err(...), found Ok(...)", res);
panic!();
panic!("{:#?}\nExpect Err(...), found Ok(...)", res);
}
}
}
@@ -235,7 +236,7 @@ def calc_rvs(open_time, close):
rv_180d = vector([calc_rv(close, open_time, timepoint, datetime("180d"))])
return rv_7d, rv_15d, rv_30d, rv_60d, rv_90d, rv_180d
"#;
let close_array = PrimitiveArray::from_slice([
let close_array = Float32Vector::from_slice([
10106.79f32,
10106.09,
10108.73,
@@ -248,17 +249,20 @@ def calc_rvs(open_time, close):
10117.08,
10120.43,
]);
let open_time_array = PrimitiveArray::from_slice([
let open_time_array = Int64Vector::from_slice([
300i64, 900i64, 1200i64, 1800i64, 2400i64, 3000i64, 3600i64, 4200i64, 4800i64, 5400i64,
6000i64,
]);
let schema = Arc::new(Schema::from(vec![
Field::new("close", DataType::Float32, false),
Field::new("open_time", DataType::Int64, false),
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("close", ConcreteDataType::float32_datatype(), false),
ColumnSchema::new("open_time", ConcreteDataType::int64_datatype(), false),
]));
let rb = DfRecordBatch::try_new(
let rb = RecordBatch::new(
schema,
vec![Arc::new(close_array), Arc::new(open_time_array)],
[
Arc::new(close_array) as VectorRef,
Arc::new(open_time_array) as VectorRef,
],
)
.unwrap();
let ret = coprocessor::exec_coprocessor(python_source, &rb);
@@ -297,14 +301,20 @@ def a(cpu, mem):
ref = log2(fed/prev(fed))
return (0.5 < cpu) & ~( cpu >= 0.75)
"#;
let cpu_array = PrimitiveArray::from_slice([0.9f32, 0.8, 0.7, 0.3]);
let mem_array = PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4]);
let schema = Arc::new(Schema::from(vec![
Field::new("cpu", DataType::Float32, false),
Field::new("mem", DataType::Float64, false),
let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]);
let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]);
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), false),
ColumnSchema::new("mem", ConcreteDataType::float64_datatype(), false),
]));
let rb =
DfRecordBatch::try_new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap();
let rb = RecordBatch::new(
schema,
[
Arc::new(cpu_array) as VectorRef,
Arc::new(mem_array) as VectorRef,
],
)
.unwrap();
let ret = coprocessor::exec_coprocessor(python_source, &rb);
if let Err(Error::PyParse {
backtrace: _,

View File

@@ -14,10 +14,12 @@
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, BooleanArray, NullArray, PrimitiveArray, Utf8Array};
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue as DFColValue;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{
BooleanVector, Float64Vector, Helper, Int64Vector, NullVector, StringVector, VectorRef,
};
use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr};
use rustpython_vm::{PyObjectRef, PyPayload, PyRef, VirtualMachine};
use snafu::{Backtrace, GenerateImplicitData, OptionExt, ResultExt};
@@ -54,26 +56,26 @@ pub fn py_vec_obj_to_array(
obj: &PyObjectRef,
vm: &VirtualMachine,
col_len: usize,
) -> Result<ArrayRef, error::Error> {
) -> Result<VectorRef, error::Error> {
// It's ugly, but we can't find a better way right now.
if is_instance::<PyVector>(obj, vm) {
let pyv = obj.payload::<PyVector>().with_context(|| {
ret_other_error_with(format!("can't cast obj {:?} to PyVector", obj))
})?;
Ok(pyv.to_arrow_array())
Ok(pyv.as_vector_ref())
} else if is_instance::<PyInt>(obj, vm) {
let val = obj
.to_owned()
.try_into_value::<i64>(vm)
.map_err(|e| format_py_error(e, vm))?;
let ret = PrimitiveArray::from_vec(vec![val; col_len]);
let ret = Int64Vector::from_iterator(std::iter::repeat(val).take(col_len));
Ok(Arc::new(ret) as _)
} else if is_instance::<PyFloat>(obj, vm) {
let val = obj
.to_owned()
.try_into_value::<f64>(vm)
.map_err(|e| format_py_error(e, vm))?;
let ret = PrimitiveArray::from_vec(vec![val; col_len]);
let ret = Float64Vector::from_iterator(std::iter::repeat(val).take(col_len));
Ok(Arc::new(ret) as _)
} else if is_instance::<PyBool>(obj, vm) {
let val = obj
@@ -81,7 +83,7 @@ pub fn py_vec_obj_to_array(
.try_into_value::<bool>(vm)
.map_err(|e| format_py_error(e, vm))?;
let ret = BooleanArray::from_iter(std::iter::repeat(Some(val)).take(col_len));
let ret = BooleanVector::from_iterator(std::iter::repeat(val).take(col_len));
Ok(Arc::new(ret) as _)
} else if is_instance::<PyStr>(obj, vm) {
let val = obj
@@ -89,7 +91,7 @@ pub fn py_vec_obj_to_array(
.try_into_value::<String>(vm)
.map_err(|e| format_py_error(e, vm))?;
let ret = Utf8Array::<i32>::from_iter(std::iter::repeat(Some(val)).take(col_len));
let ret = StringVector::from_iterator(std::iter::repeat(val.as_str()).take(col_len));
Ok(Arc::new(ret) as _)
} else if is_instance::<PyList>(obj, vm) {
let columnar_value =
@@ -101,9 +103,9 @@ pub fn py_vec_obj_to_array(
let array = ScalarValue::iter_to_array(scalars.into_iter())
.context(error::DataFusionSnafu)?;
Ok(array)
Helper::try_into_vector(array).context(error::TypeCastSnafu)
}
None => Ok(Arc::new(NullArray::new(DataType::Null, 0))),
None => Ok(Arc::new(NullVector::new(0))),
},
_ => unreachable!(),
}

View File

@@ -19,17 +19,17 @@ use std::sync::Arc;
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::Timestamp;
use datatypes::arrow::array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
use datatypes::arrow::array::{
Array, ArrayRef, BooleanArray, Float64Array, Int64Array, UInt64Array,
};
use datatypes::arrow::compute;
use datatypes::arrow::compute::cast::{self, CastOptions};
use datatypes::arrow::compute::{arithmetics, comparison};
use datatypes::arrow::datatypes::DataType;
use datatypes::arrow::scalar::{PrimitiveScalar, Scalar};
use datatypes::data_type::ConcreteDataType;
use datatypes::arrow::compute::kernels::{arithmetic, boolean, comparison};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::Value;
use datatypes::value::OrderedFloat;
use datatypes::vectors::{Helper, NullVector, VectorBuilder, VectorRef};
use datatypes::{arrow, value};
use datatypes::value::{self, OrderedFloat};
use datatypes::vectors::{Helper, NullVector, VectorRef};
use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyBytes, PyFloat, PyInt, PyNone, PyStr};
use rustpython_vm::function::{Either, OptionalArg, PyComparisonValue};
use rustpython_vm::protocol::{PyMappingMethods, PySequenceMethods};
@@ -55,120 +55,71 @@ impl From<VectorRef> for PyVector {
fn emit_cast_error(
vm: &VirtualMachine,
src_ty: &DataType,
dst_ty: &DataType,
src_ty: &ArrowDataType,
dst_ty: &ArrowDataType,
) -> PyBaseExceptionRef {
vm.new_type_error(format!(
"Can't cast source operand of type {:?} into target type of {:?}",
src_ty, dst_ty
))
}
fn arrow2_rsub_scalar(
arr: &dyn Array,
val: &dyn Scalar,
_vm: &VirtualMachine,
) -> PyResult<Box<dyn Array>> {
// b - a => a * (-1) + b
let neg = arithmetics::mul_scalar(arr, &PrimitiveScalar::new(DataType::Int64, Some(-1i64)));
Ok(arithmetics::add_scalar(neg.as_ref(), val))
/// Performs `val - arr`.
fn arrow_rsub(arr: &dyn Array, val: &dyn Array, vm: &VirtualMachine) -> PyResult<ArrayRef> {
arithmetic::subtract_dyn(val, arr).map_err(|e| vm.new_type_error(format!("rsub error: {}", e)))
}
fn arrow2_rtruediv_scalar(
arr: &dyn Array,
val: &dyn Scalar,
vm: &VirtualMachine,
) -> PyResult<Box<dyn Array>> {
// val / arr => one_arr / arr * val (this is simpler to write)
let one_arr: Box<dyn Array> = if is_float(arr.data_type()) {
Box::new(PrimitiveArray::from_values(vec![1f64; arr.len()]))
} else if is_integer(arr.data_type()) {
Box::new(PrimitiveArray::from_values(vec![1i64; arr.len()]))
} else {
return Err(vm.new_not_implemented_error(format!(
"truediv of {:?} Scalar with {:?} Array is not supported",
val.data_type(),
arr.data_type()
)));
};
let tmp = arithmetics::mul_scalar(one_arr.as_ref(), val);
Ok(arithmetics::div(tmp.as_ref(), arr))
/// Performs `val / arr`
fn arrow_rtruediv(arr: &dyn Array, val: &dyn Array, vm: &VirtualMachine) -> PyResult<ArrayRef> {
arithmetic::divide_dyn(val, arr)
.map_err(|e| vm.new_type_error(format!("rtruediv error: {}", e)))
}
fn arrow2_rfloordiv_scalar(
arr: &dyn Array,
val: &dyn Scalar,
vm: &VirtualMachine,
) -> PyResult<Box<dyn Array>> {
// val // arr => one_arr // arr * val (this is simpler to write)
let one_arr: Box<dyn Array> = if is_float(arr.data_type()) {
Box::new(PrimitiveArray::from_values(vec![1f64; arr.len()]))
} else if is_integer(arr.data_type()) {
Box::new(PrimitiveArray::from_values(vec![1i64; arr.len()]))
} else {
return Err(vm.new_not_implemented_error(format!(
"truediv of {:?} Scalar with {:?} Array is not supported",
val.data_type(),
arr.data_type()
)));
};
let tmp = arithmetics::mul_scalar(one_arr.as_ref(), val);
Ok(arrow::compute::cast::cast(
arithmetics::div(tmp.as_ref(), arr).as_ref(),
&DataType::Int64,
cast::CastOptions {
wrapped: false,
partial: true,
},
)
.unwrap())
/// Performs `val / arr`, but cast to i64.
fn arrow_rfloordiv(arr: &dyn Array, val: &dyn Array, vm: &VirtualMachine) -> PyResult<ArrayRef> {
let array = arithmetic::divide_dyn(val, arr)
.map_err(|e| vm.new_type_error(format!("rtruediv divide error: {}", e)))?;
compute::cast(&array, &ArrowDataType::Int64)
.map_err(|e| vm.new_type_error(format!("rtruediv cast error: {}", e)))
}
fn wrap_result<F>(
f: F,
) -> impl Fn(&dyn Array, &dyn Scalar, &VirtualMachine) -> PyResult<Box<dyn Array>>
fn wrap_result<F>(f: F) -> impl Fn(&dyn Array, &dyn Array, &VirtualMachine) -> PyResult<ArrayRef>
where
F: Fn(&dyn Array, &dyn Scalar) -> Box<dyn Array>,
F: Fn(&dyn Array, &dyn Array) -> ArrowResult<ArrayRef>,
{
move |left, right, _vm| Ok(f(left, right))
move |left, right, vm| {
f(left, right).map_err(|e| vm.new_type_error(format!("arithmetic error {}", e)))
}
}
fn is_float(datatype: &DataType) -> bool {
fn is_float(datatype: &ArrowDataType) -> bool {
matches!(
datatype,
DataType::Float16 | DataType::Float32 | DataType::Float64
ArrowDataType::Float16 | ArrowDataType::Float32 | ArrowDataType::Float64
)
}
fn is_integer(datatype: &DataType) -> bool {
is_signed(datatype) || is_unsigned(datatype)
}
fn is_signed(datatype: &DataType) -> bool {
fn is_signed(datatype: &ArrowDataType) -> bool {
matches!(
datatype,
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64
ArrowDataType::Int8 | ArrowDataType::Int16 | ArrowDataType::Int32 | ArrowDataType::Int64
)
}
fn is_unsigned(datatype: &DataType) -> bool {
fn is_unsigned(datatype: &ArrowDataType) -> bool {
matches!(
datatype,
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64
ArrowDataType::UInt8
| ArrowDataType::UInt16
| ArrowDataType::UInt32
| ArrowDataType::UInt64
)
}
fn cast(array: ArrayRef, target_type: &DataType, vm: &VirtualMachine) -> PyResult<Box<dyn Array>> {
cast::cast(
array.as_ref(),
target_type,
CastOptions {
wrapped: true,
partial: true,
},
)
.map_err(|e| vm.new_type_error(e.to_string()))
fn cast(array: ArrayRef, target_type: &ArrowDataType, vm: &VirtualMachine) -> PyResult<ArrayRef> {
compute::cast(&array, target_type).map_err(|e| vm.new_type_error(e.to_string()))
}
fn from_debug_error(err: impl std::fmt::Debug, vm: &VirtualMachine) -> PyBaseExceptionRef {
vm.new_runtime_error(format!("Runtime Error: {err:#?}"))
}
@@ -194,7 +145,7 @@ impl PyVector {
}
let datatype = get_concrete_type(&elements[0], vm)?;
let mut buf = VectorBuilder::with_capacity(datatype.clone(), elements.len());
let mut buf = datatype.create_mutable_vector(elements.len());
for obj in elements.drain(..) {
let val = if let Some(v) =
@@ -207,11 +158,12 @@ impl PyVector {
obj, datatype
)));
};
buf.push(&val);
// Safety: `pyobj_try_to_typed_val()` has checked the data type.
buf.push_value_ref(val.as_value_ref()).unwrap();
}
Ok(PyVector {
vector: buf.finish(),
vector: buf.to_vector(),
})
} else {
Ok(PyVector::default())
@@ -232,23 +184,26 @@ impl PyVector {
fn scalar_arith_op<F>(
&self,
other: PyObjectRef,
target_type: Option<DataType>,
target_type: Option<ArrowDataType>,
op: F,
vm: &VirtualMachine,
) -> PyResult<PyVector>
where
F: Fn(&dyn Array, &dyn Scalar, &VirtualMachine) -> PyResult<Box<dyn Array>>,
F: Fn(&dyn Array, &dyn Array, &VirtualMachine) -> PyResult<ArrayRef>,
{
// the right operand only support PyInt or PyFloat,
let (right, right_type) = {
if is_instance::<PyInt>(&other, vm) {
other
.try_into_value::<i64>(vm)
.map(|v| (value::Value::Int64(v), DataType::Int64))?
.map(|v| (value::Value::Int64(v), ArrowDataType::Int64))?
} else if is_instance::<PyFloat>(&other, vm) {
other
.try_into_value::<f64>(vm)
.map(|v| (value::Value::Float64(OrderedFloat(v)), DataType::Float64))?
other.try_into_value::<f64>(vm).map(|v| {
(
value::Value::Float64(OrderedFloat(v)),
ArrowDataType::Float64,
)
})?
} else {
return Err(vm.new_type_error(format!(
"Can't cast right operand into Scalar of Int or Float, actual: {}",
@@ -264,45 +219,38 @@ impl PyVector {
// TODO(discord9): found better way to cast between signed and unsigned type
let target_type = target_type.unwrap_or_else(|| {
if is_signed(left_type) && is_signed(right_type) {
DataType::Int64
ArrowDataType::Int64
} else if is_unsigned(left_type) && is_unsigned(right_type) {
DataType::UInt64
ArrowDataType::UInt64
} else {
DataType::Float64
ArrowDataType::Float64
}
});
let left = cast(left, &target_type, vm)?;
let right: Box<dyn Scalar> = if is_float(&target_type) {
let left_len = left.len();
// Convert `right` to an array of `target_type`.
let right: Box<dyn Array> = if is_float(&target_type) {
match right {
value::Value::Int64(v) => {
Box::new(PrimitiveScalar::new(target_type, Some(v as f64)))
}
value::Value::UInt64(v) => {
Box::new(PrimitiveScalar::new(target_type, Some(v as f64)))
}
value::Value::Int64(v) => Box::new(Float64Array::from_value(v as f64, left_len)),
value::Value::UInt64(v) => Box::new(Float64Array::from_value(v as f64, left_len)),
value::Value::Float64(v) => {
Box::new(PrimitiveScalar::new(target_type, Some(f64::from(v))))
Box::new(Float64Array::from_value(f64::from(v), left_len))
}
_ => unreachable!(),
}
} else if is_signed(&target_type) {
match right {
value::Value::Int64(v) => Box::new(PrimitiveScalar::new(target_type, Some(v))),
value::Value::UInt64(v) => {
Box::new(PrimitiveScalar::new(target_type, Some(v as i64)))
}
value::Value::Float64(v) => {
Box::new(PrimitiveScalar::new(DataType::Float64, Some(v.0 as i64)))
}
value::Value::Int64(v) => Box::new(Int64Array::from_value(v, left_len)),
value::Value::UInt64(v) => Box::new(Int64Array::from_value(v as i64, left_len)),
value::Value::Float64(v) => Box::new(Int64Array::from_value(v.0 as i64, left_len)),
_ => unreachable!(),
}
} else if is_unsigned(&target_type) {
match right {
value::Value::Int64(v) => Box::new(PrimitiveScalar::new(target_type, Some(v))),
value::Value::UInt64(v) => Box::new(PrimitiveScalar::new(target_type, Some(v))),
value::Value::Float64(v) => {
Box::new(PrimitiveScalar::new(target_type, Some(f64::from(v))))
}
value::Value::Int64(v) => Box::new(UInt64Array::from_value(v as u64, left_len)),
value::Value::UInt64(v) => Box::new(UInt64Array::from_value(v, left_len)),
value::Value::Float64(v) => Box::new(UInt64Array::from_value(v.0 as u64, left_len)),
_ => unreachable!(),
}
} else {
@@ -311,7 +259,7 @@ impl PyVector {
let result = op(left.as_ref(), right.as_ref(), vm)?;
Ok(Helper::try_into_vector(&*result)
Ok(Helper::try_into_vector(result.clone())
.map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
@@ -324,12 +272,12 @@ impl PyVector {
fn arith_op<F>(
&self,
other: PyObjectRef,
target_type: Option<DataType>,
target_type: Option<ArrowDataType>,
op: F,
vm: &VirtualMachine,
) -> PyResult<PyVector>
where
F: Fn(&dyn Array, &dyn Array) -> Box<dyn Array>,
F: Fn(&dyn Array, &dyn Array) -> ArrowResult<ArrayRef>,
{
let right = other.downcast_ref::<PyVector>().ok_or_else(|| {
vm.new_type_error(format!(
@@ -345,20 +293,21 @@ impl PyVector {
let target_type = target_type.unwrap_or_else(|| {
if is_signed(left_type) && is_signed(right_type) {
DataType::Int64
ArrowDataType::Int64
} else if is_unsigned(left_type) && is_unsigned(right_type) {
DataType::UInt64
ArrowDataType::UInt64
} else {
DataType::Float64
ArrowDataType::Float64
}
});
let left = cast(left, &target_type, vm)?;
let right = cast(right, &target_type, vm)?;
let result = op(left.as_ref(), right.as_ref());
let result = op(left.as_ref(), right.as_ref())
.map_err(|e| vm.new_type_error(format!("Can't compute op, error: {}", e)))?;
Ok(Helper::try_into_vector(&*result)
Ok(Helper::try_into_vector(result.clone())
.map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
@@ -372,27 +321,27 @@ impl PyVector {
#[pymethod(magic)]
fn add(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyVector> {
if is_pyobj_scalar(&other, vm) {
self.scalar_arith_op(other, None, wrap_result(arithmetics::add_scalar), vm)
self.scalar_arith_op(other, None, wrap_result(arithmetic::add_dyn), vm)
} else {
self.arith_op(other, None, arithmetics::add, vm)
self.arith_op(other, None, arithmetic::add_dyn, vm)
}
}
#[pymethod(magic)]
fn sub(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyVector> {
if is_pyobj_scalar(&other, vm) {
self.scalar_arith_op(other, None, wrap_result(arithmetics::sub_scalar), vm)
self.scalar_arith_op(other, None, wrap_result(arithmetic::subtract_dyn), vm)
} else {
self.arith_op(other, None, arithmetics::sub, vm)
self.arith_op(other, None, arithmetic::subtract_dyn, vm)
}
}
#[pymethod(magic)]
fn rsub(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyVector> {
if is_pyobj_scalar(&other, vm) {
self.scalar_arith_op(other, None, arrow2_rsub_scalar, vm)
self.scalar_arith_op(other, None, arrow_rsub, vm)
} else {
self.arith_op(other, None, |a, b| arithmetics::sub(b, a), vm)
self.arith_op(other, None, |a, b| arithmetic::subtract_dyn(b, a), vm)
}
}
@@ -400,9 +349,9 @@ impl PyVector {
#[pymethod(magic)]
fn mul(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyVector> {
if is_pyobj_scalar(&other, vm) {
self.scalar_arith_op(other, None, wrap_result(arithmetics::mul_scalar), vm)
self.scalar_arith_op(other, None, wrap_result(arithmetic::multiply_dyn), vm)
} else {
self.arith_op(other, None, arithmetics::mul, vm)
self.arith_op(other, None, arithmetic::multiply_dyn, vm)
}
}
@@ -411,24 +360,29 @@ impl PyVector {
if is_pyobj_scalar(&other, vm) {
self.scalar_arith_op(
other,
Some(DataType::Float64),
wrap_result(arithmetics::div_scalar),
Some(ArrowDataType::Float64),
wrap_result(arithmetic::divide_dyn),
vm,
)
} else {
self.arith_op(other, Some(DataType::Float64), arithmetics::div, vm)
self.arith_op(
other,
Some(ArrowDataType::Float64),
arithmetic::divide_dyn,
vm,
)
}
}
#[pymethod(magic)]
fn rtruediv(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyVector> {
if is_pyobj_scalar(&other, vm) {
self.scalar_arith_op(other, Some(DataType::Float64), arrow2_rtruediv_scalar, vm)
self.scalar_arith_op(other, Some(ArrowDataType::Float64), arrow_rtruediv, vm)
} else {
self.arith_op(
other,
Some(DataType::Float64),
|a, b| arithmetics::div(b, a),
Some(ArrowDataType::Float64),
|a, b| arithmetic::divide_dyn(b, a),
vm,
)
}
@@ -439,12 +393,17 @@ impl PyVector {
if is_pyobj_scalar(&other, vm) {
self.scalar_arith_op(
other,
Some(DataType::Int64),
wrap_result(arithmetics::div_scalar),
Some(ArrowDataType::Int64),
wrap_result(arithmetic::divide_dyn),
vm,
)
} else {
self.arith_op(other, Some(DataType::Int64), arithmetics::div, vm)
self.arith_op(
other,
Some(ArrowDataType::Int64),
arithmetic::divide_dyn,
vm,
)
}
}
@@ -452,12 +411,12 @@ impl PyVector {
fn rfloordiv(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyVector> {
if is_pyobj_scalar(&other, vm) {
// FIXME: DataType convert problem, target_type should be inferred?
self.scalar_arith_op(other, Some(DataType::Int64), arrow2_rfloordiv_scalar, vm)
self.scalar_arith_op(other, Some(ArrowDataType::Int64), arrow_rfloordiv, vm)
} else {
self.arith_op(
other,
Some(DataType::Int64),
|a, b| arithmetics::div(b, a),
Some(ArrowDataType::Int64),
|a, b| arithmetic::divide_dyn(b, a),
vm,
)
}
@@ -533,9 +492,9 @@ impl PyVector {
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| vm.new_type_error(format!("Can't cast {left:#?} as a Boolean Array")))?;
let res = compute::boolean::and(left, right).map_err(|err| from_debug_error(err, vm))?;
let res = boolean::and(left, right).map_err(|err| from_debug_error(err, vm))?;
let res = Arc::new(res) as ArrayRef;
let ret = Helper::try_into_vector(&*res).map_err(|err| from_debug_error(err, vm))?;
let ret = Helper::try_into_vector(res.clone()).map_err(|err| from_debug_error(err, vm))?;
Ok(ret.into())
}
@@ -551,9 +510,9 @@ impl PyVector {
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| vm.new_type_error(format!("Can't cast {left:#?} as a Boolean Array")))?;
let res = compute::boolean::or(left, right).map_err(|err| from_debug_error(err, vm))?;
let res = boolean::or(left, right).map_err(|err| from_debug_error(err, vm))?;
let res = Arc::new(res) as ArrayRef;
let ret = Helper::try_into_vector(&*res).map_err(|err| from_debug_error(err, vm))?;
let ret = Helper::try_into_vector(res.clone()).map_err(|err| from_debug_error(err, vm))?;
Ok(ret.into())
}
@@ -565,9 +524,9 @@ impl PyVector {
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| vm.new_type_error(format!("Can't cast {left:#?} as a Boolean Array")))?;
let res = compute::boolean::not(left);
let res = boolean::not(left).map_err(|err| from_debug_error(err, vm))?;
let res = Arc::new(res) as ArrayRef;
let ret = Helper::try_into_vector(&*res).map_err(|err| from_debug_error(err, vm))?;
let ret = Helper::try_into_vector(res.clone()).map_err(|err| from_debug_error(err, vm))?;
Ok(ret.into())
}
@@ -580,15 +539,15 @@ impl PyVector {
#[pymethod(name = "filter")]
fn filter(&self, other: PyVectorRef, vm: &VirtualMachine) -> PyResult<PyVector> {
let left = self.to_arrow_array();
let right: ArrayRef = other.to_arrow_array();
let right = other.to_arrow_array();
let filter = right.as_any().downcast_ref::<BooleanArray>();
match filter {
Some(filter) => {
let res = compute::filter::filter(left.as_ref(), filter);
let res = compute::filter(left.as_ref(), filter);
let res =
res.map_err(|err| vm.new_runtime_error(format!("Arrow Error: {err:#?}")))?;
let ret = Helper::try_into_vector(&*res).map_err(|e| {
let ret = Helper::try_into_vector(res.clone()).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
res, e
@@ -618,14 +577,10 @@ impl PyVector {
.ok_or_else(|| {
vm.new_type_error(format!("Can't cast {seq:#?} as a Boolean Array"))
})?;
// let left = self.to_arrow_array();
let res = compute::filter::filter(self.to_arrow_array().as_ref(), mask)
let res = compute::filter(self.to_arrow_array().as_ref(), mask)
.map_err(|err| vm.new_runtime_error(format!("Arrow Error: {err:#?}")))?;
let ret = Helper::try_into_vector(&*res).map_err(|e| {
vm.new_type_error(format!(
"Can't cast result into vector, result: {:?}, err: {:?}",
res, e
))
let ret = Helper::try_into_vector(res.clone()).map_err(|e| {
vm.new_type_error(format!("Can't cast result into vector, err: {:?}", e))
})?;
Ok(Self::from(ret).into_pyobject(vm))
} else {
@@ -654,9 +609,9 @@ impl PyVector {
let (mut range, step, slice_len) = slice.adjust_indices(self.len());
let vector = self.as_vector_ref();
let mut buf = VectorBuilder::with_capacity(vector.data_type(), slice_len);
let mut buf = vector.data_type().create_mutable_vector(slice_len);
if slice_len == 0 {
let v: PyVector = buf.finish().into();
let v: PyVector = buf.to_vector().into();
Ok(v.into_pyobject(vm))
} else if step == 1 {
let v: PyVector = vector.slice(range.next().unwrap_or(0), slice_len).into();
@@ -664,15 +619,17 @@ impl PyVector {
} else if step.is_negative() {
// Negative step require special treatment
for i in range.rev().step_by(step.unsigned_abs()) {
buf.push(&vector.get(i))
// Safety: This mutable vector is created from the vector's data type.
buf.push_value_ref(vector.get_ref(i)).unwrap();
}
let v: PyVector = buf.finish().into();
let v: PyVector = buf.to_vector().into();
Ok(v.into_pyobject(vm))
} else {
for i in range.step_by(step.unsigned_abs()) {
buf.push(&vector.get(i))
// Safety: This mutable vector is created from the vector's data type.
buf.push_value_ref(vector.get_ref(i)).unwrap();
}
let v: PyVector = buf.finish().into();
let v: PyVector = buf.to_vector().into();
Ok(v.into_pyobject(vm))
}
}
@@ -693,19 +650,19 @@ impl PyVector {
/// get corresponding arrow op function according to given PyComaprsionOp
///
/// TODO(discord9): impl scalar version function
fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> Box<dyn Array> {
fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> ArrowResult<ArrayRef> {
let op_bool_arr = match op {
PyComparisonOp::Eq => comparison::eq,
PyComparisonOp::Ne => comparison::neq,
PyComparisonOp::Gt => comparison::gt,
PyComparisonOp::Lt => comparison::lt,
PyComparisonOp::Ge => comparison::gt_eq,
PyComparisonOp::Le => comparison::lt_eq,
PyComparisonOp::Eq => comparison::eq_dyn,
PyComparisonOp::Ne => comparison::neq_dyn,
PyComparisonOp::Gt => comparison::gt_dyn,
PyComparisonOp::Lt => comparison::lt_dyn,
PyComparisonOp::Ge => comparison::gt_eq_dyn,
PyComparisonOp::Le => comparison::lt_eq_dyn,
};
move |a: &dyn Array, b: &dyn Array| -> Box<dyn Array> {
let ret = op_bool_arr(a, b);
Box::new(ret) as _
move |a: &dyn Array, b: &dyn Array| -> ArrowResult<ArrayRef> {
let array = op_bool_arr(a, b)?;
Ok(Arc::new(array))
}
}
@@ -714,19 +671,20 @@ fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> Box<dy
/// TODO(discord9): impl scalar version function
fn get_arrow_scalar_op(
op: PyComparisonOp,
) -> impl Fn(&dyn Array, &dyn Scalar, &VirtualMachine) -> PyResult<Box<dyn Array>> {
) -> impl Fn(&dyn Array, &dyn Array, &VirtualMachine) -> PyResult<ArrayRef> {
let op_bool_arr = match op {
PyComparisonOp::Eq => comparison::eq_scalar,
PyComparisonOp::Ne => comparison::neq_scalar,
PyComparisonOp::Gt => comparison::gt_scalar,
PyComparisonOp::Lt => comparison::lt_scalar,
PyComparisonOp::Ge => comparison::gt_eq_scalar,
PyComparisonOp::Le => comparison::lt_eq_scalar,
PyComparisonOp::Eq => comparison::eq_dyn,
PyComparisonOp::Ne => comparison::neq_dyn,
PyComparisonOp::Gt => comparison::gt_dyn,
PyComparisonOp::Lt => comparison::lt_dyn,
PyComparisonOp::Ge => comparison::gt_eq_dyn,
PyComparisonOp::Le => comparison::lt_eq_dyn,
};
move |a: &dyn Array, b: &dyn Scalar, _vm| -> PyResult<Box<dyn Array>> {
let ret = op_bool_arr(a, b);
Ok(Box::new(ret) as _)
move |a: &dyn Array, b: &dyn Array, vm| -> PyResult<ArrayRef> {
let array =
op_bool_arr(a, b).map_err(|e| vm.new_type_error(format!("scalar op error: {}", e)))?;
Ok(Arc::new(array))
}
}

View File

@@ -21,12 +21,10 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_
use common_query::Output;
use common_recordbatch::util as record_util;
use common_telemetry::logging;
use common_time::timestamp::Timestamp;
use common_time::util;
use datatypes::arrow::array::Utf8Array;
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use datatypes::vectors::{StringVector, TimestampVector, VectorRef};
use datatypes::vectors::{StringVector, TimestampMillisecondVector, Vector, VectorRef};
use query::QueryEngineRef;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
@@ -104,21 +102,16 @@ impl ScriptsTable {
// Timestamp in key part is intentionally left to 0
columns_values.insert(
"timestamp".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
0,
)])) as _,
Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _,
);
let now = util::current_time_millis();
columns_values.insert(
"gmt_created".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _,
);
columns_values.insert(
"gmt_modified".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _,
);
let table = self
@@ -173,23 +166,21 @@ impl ScriptsTable {
ensure!(!records.is_empty(), ScriptNotFoundSnafu { name });
assert_eq!(records.len(), 1);
assert_eq!(records[0].df_recordbatch.num_columns(), 1);
assert_eq!(records[0].num_columns(), 1);
let record = &records[0].df_recordbatch;
let script_column = record
.column(0)
let script_column = records[0].column(0);
let script_column = script_column
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.context(CastTypeSnafu {
.downcast_ref::<StringVector>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into utf8 array",
record.column(0).data_type()
"can't downcast {:?} array into string vector",
script_column.data_type()
),
})?;
assert_eq!(script_column.len(), 1);
Ok(script_column.value(0).to_string())
Ok(script_column.get_data(0).unwrap().to_string())
}
#[inline]