diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 47c86831dc..6b24a9c5a9 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -79,11 +79,21 @@ impl RecordBatch { self.df_record_batch } + #[inline] + pub fn columns(&self) -> &[VectorRef] { + &self.columns + } + #[inline] pub fn column(&self, idx: usize) -> &VectorRef { &self.columns[idx] } + pub fn column_by_name(&self, name: &str) -> Option<&VectorRef> { + let idx = self.schema.column_index_by_name(name)?; + Some(&self.columns[idx]) + } + #[inline] pub fn num_columns(&self) -> usize { self.columns.len() @@ -179,18 +189,23 @@ mod tests { ])); let schema = Arc::new(Schema::try_from(arrow_schema).unwrap()); - let v = Arc::new(UInt32Vector::from_slice(&[1, 2, 3])); - let columns: Vec = vec![v.clone(), v.clone()]; + let c1 = Arc::new(UInt32Vector::from_slice(&[1, 2, 3])); + let c2 = Arc::new(UInt32Vector::from_slice(&[4, 5, 6])); + let columns: Vec = vec![c1, c2]; - let batch = RecordBatch::new(schema.clone(), columns).unwrap(); + let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap(); assert_eq!(3, batch.num_rows()); - for i in 0..batch.num_columns() { + assert_eq!(&columns, batch.columns()); + for (i, expect) in columns.iter().enumerate().take(batch.num_columns()) { let column = batch.column(i); - let actual = column.as_any().downcast_ref::().unwrap(); - assert_eq!(&*v, actual); + assert_eq!(expect, column); } assert_eq!(schema, batch.schema); + assert_eq!(columns[0], *batch.column_by_name("c1").unwrap()); + assert_eq!(columns[1], *batch.column_by_name("c2").unwrap()); + assert!(batch.column_by_name("c3").is_none()); + let converted = RecordBatch::try_from_df_record_batch(schema, batch.df_record_batch().clone()).unwrap(); assert_eq!(batch, converted); diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index ed99a7b778..046aa0dd64 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -8,6 +8,7 @@ license = "Apache-2.0" default = ["python"] python = [ "dep:datafusion", + "dep:datafusion-common", "dep:datafusion-expr", "dep:datafusion-physical-expr", "dep:rustpython-vm", diff --git a/src/script/src/python/builtins/mod.rs b/src/script/src/python/builtins/mod.rs index 5bee6e5577..ef515db94d 100644 --- a/src/script/src/python/builtins/mod.rs +++ b/src/script/src/python/builtins/mod.rs @@ -21,10 +21,9 @@ mod test; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::ColumnarValue as DFColValue; use datafusion_physical_expr::AggregateExpr; -use datatypes::arrow; use datatypes::arrow::array::ArrayRef; -use datatypes::arrow::compute::cast::CastOptions; -use datatypes::arrow::datatypes::DataType; +use datatypes::arrow::compute; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; use datatypes::vectors::Helper as HelperVec; use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr}; use rustpython_vm::{pymodule, AsObject, PyObjectRef, PyPayload, PyResult, VirtualMachine}; @@ -37,7 +36,7 @@ fn type_cast_error(name: &str, ty: &str, vm: &VirtualMachine) -> PyBaseException vm.new_type_error(format!("Can't cast operand of type `{name}` into `{ty}`.")) } -fn collect_diff_types_string(values: &[ScalarValue], ty: &DataType) -> String { +fn collect_diff_types_string(values: &[ScalarValue], ty: &ArrowDataType) -> String { values .iter() .enumerate() @@ -56,6 +55,10 @@ fn collect_diff_types_string(values: &[ScalarValue], ty: &DataType) -> String { .unwrap_or_else(|| "Nothing".to_string()) } +fn new_item_field(data_type: ArrowDataType) -> Field { + Field::new("item", data_type, false) +} + /// try to turn a Python Object into a PyVector or a scalar that can be use for calculate /// /// supported scalar are(leftside is python data type, right side is rust type): @@ -109,7 +112,7 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul // TODO(dennis): empty list, we set type as null. return Ok(DFColValue::Scalar(ScalarValue::List( None, - Box::new(DataType::Null), + Box::new(new_item_field(ArrowDataType::Null)), ))); } @@ -121,8 +124,8 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul ))); } Ok(DFColValue::Scalar(ScalarValue::List( - Some(Box::new(ret)), - Box::new(ty), + Some(ret), + Box::new(new_item_field(ty)), ))) } else { Err(vm.new_type_error(format!( @@ -184,22 +187,14 @@ fn scalar_val_try_into_py_obj(val: ScalarValue, vm: &VirtualMachine) -> PyResult fn all_to_f64(col: DFColValue, vm: &VirtualMachine) -> PyResult { match col { DFColValue::Array(arr) => { - let res = arrow::compute::cast::cast( - arr.as_ref(), - &DataType::Float64, - CastOptions { - wrapped: true, - partial: true, - }, - ) - .map_err(|err| { + let res = compute::cast(&arr, &ArrowDataType::Float64).map_err(|err| { vm.new_type_error(format!( "Arrow Type Cast Fail(from {:#?} to {:#?}): {err:#?}", arr.data_type(), - DataType::Float64 + ArrowDataType::Float64 )) })?; - Ok(DFColValue::Array(res.into())) + Ok(DFColValue::Array(res)) } DFColValue::Scalar(val) => { let val_in_f64 = match val { @@ -210,7 +205,7 @@ fn all_to_f64(col: DFColValue, vm: &VirtualMachine) -> PyResult { return Err(vm.new_type_error(format!( "Can't cast type {:#?} to {:#?}", val.get_datatype(), - DataType::Float64 + ArrowDataType::Float64 ))) } }; @@ -284,17 +279,16 @@ pub(crate) mod greptime_builtin { // P.S.: not extract to file because not-inlined proc macro attribute is *unstable* use std::sync::Arc; + use arrow::compute::kernels::{aggregate, boolean, comparison}; use common_function::scalars::function::FunctionContext; use common_function::scalars::math::PowFunction; use common_function::scalars::{Function, FunctionRef, FUNCTION_REGISTRY}; - use datafusion::arrow::compute::comparison::{gt_eq_scalar, lt_eq_scalar}; - use datafusion::arrow::datatypes::DataType; - use datafusion::arrow::error::ArrowError; - use datafusion::arrow::scalar::{PrimitiveScalar, Scalar}; + use datafusion::arrow::datatypes::DataType as ArrowDataType; use datafusion::physical_plan::expressions; use datafusion_expr::ColumnarValue as DFColValue; use datafusion_physical_expr::math_expressions; - use datatypes::arrow::array::{ArrayRef, NullArray}; + use datatypes::arrow::array::{ArrayRef, Int64Array, NullArray}; + use datatypes::arrow::error::ArrowError; use datatypes::arrow::{self, compute}; use datatypes::vectors::{ConstantVector, Float64Vector, Helper, Int64Vector, VectorRef}; use paste::paste; @@ -387,11 +381,6 @@ pub(crate) mod greptime_builtin { eval_func("clip", &[v0, v1, v2], vm) } - #[pyfunction] - fn median(v: PyVectorRef, vm: &VirtualMachine) -> PyResult { - eval_aggr_func("median", &[v], vm) - } - #[pyfunction] fn diff(v: PyVectorRef, vm: &VirtualMachine) -> PyResult { eval_aggr_func("diff", &[v], vm) @@ -553,7 +542,7 @@ pub(crate) mod greptime_builtin { fn random(len: usize, vm: &VirtualMachine) -> PyResult { // This is in a proc macro so using full path to avoid strange things // more info at: https://doc.rust-lang.org/reference/procedural-macros.html#procedural-macro-hygiene - let arg = NullArray::new(arrow::datatypes::DataType::Null, len); + let arg = NullArray::new(len); let args = &[DFColValue::Array(std::sync::Arc::new(arg) as _)]; let res = math_expressions::random(args).map_err(|err| from_df_err(err, vm))?; let ret = try_into_py_obj(res, vm)?; @@ -572,6 +561,17 @@ pub(crate) mod greptime_builtin { ); } + #[pyfunction] + fn median(values: PyVectorRef, vm: &VirtualMachine) -> PyResult { + bind_aggr_fn!( + Median, + vm, + &[values.to_arrow_array()], + values.to_arrow_array().data_type(), + expr0 + ); + } + /// Not implement in datafusion /// TODO(discord9): use greptime's own impl instead /* @@ -808,12 +808,16 @@ pub(crate) mod greptime_builtin { Ok(res.into()) } - fn gen_none_array(data_type: DataType, len: usize, vm: &VirtualMachine) -> PyResult { + fn gen_none_array( + data_type: ArrowDataType, + len: usize, + vm: &VirtualMachine, + ) -> PyResult { macro_rules! match_none_array { ($VAR:ident, $LEN: ident, [$($TY:ident),*]) => { paste!{ match $VAR{ - $(DataType::$TY => Arc::new(arrow::array::[<$TY Array>]::from(vec![None;$LEN])), )* + $(ArrowDataType::$TY => Arc::new(arrow::array::[<$TY Array>]::from(vec![None;$LEN])), )* _ => return Err(vm.new_type_error(format!("gen_none_array() does not support {:?}", data_type))) } } @@ -829,10 +833,10 @@ pub(crate) mod greptime_builtin { #[pyfunction] fn prev(cur: PyVectorRef, vm: &VirtualMachine) -> PyResult { - let cur: ArrayRef = cur.to_arrow_array(); + let cur = cur.to_arrow_array(); if cur.len() == 0 { let ret = cur.slice(0, 0); - let ret = Helper::try_into_vector(&*ret).map_err(|e| { + let ret = Helper::try_into_vector(ret.clone()).map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", ret, e @@ -842,10 +846,10 @@ pub(crate) mod greptime_builtin { } let cur = cur.slice(0, cur.len() - 1); // except the last one that is let fill = gen_none_array(cur.data_type().to_owned(), 1, vm)?; - let ret = compute::concatenate::concatenate(&[&*fill, &*cur]).map_err(|err| { + let ret = compute::concat(&[&*fill, &*cur]).map_err(|err| { vm.new_runtime_error(format!("Can't concat array[0] with array[0:-1]!{err:#?}")) })?; - let ret = Helper::try_into_vector(&*ret).map_err(|e| { + let ret = Helper::try_into_vector(ret.clone()).map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", ret, e @@ -856,10 +860,10 @@ pub(crate) mod greptime_builtin { #[pyfunction] fn next(cur: PyVectorRef, vm: &VirtualMachine) -> PyResult { - let cur: ArrayRef = cur.to_arrow_array(); + let cur = cur.to_arrow_array(); if cur.len() == 0 { let ret = cur.slice(0, 0); - let ret = Helper::try_into_vector(&*ret).map_err(|e| { + let ret = Helper::try_into_vector(ret.clone()).map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", ret, e @@ -869,10 +873,10 @@ pub(crate) mod greptime_builtin { } let cur = cur.slice(1, cur.len() - 1); // except the last one that is let fill = gen_none_array(cur.data_type().to_owned(), 1, vm)?; - let ret = compute::concatenate::concatenate(&[&*cur, &*fill]).map_err(|err| { + let ret = compute::concat(&[&*cur, &*fill]).map_err(|err| { vm.new_runtime_error(format!("Can't concat array[0] with array[0:-1]!{err:#?}")) })?; - let ret = Helper::try_into_vector(&*ret).map_err(|e| { + let ret = Helper::try_into_vector(ret.clone()).map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", ret, e @@ -881,55 +885,24 @@ pub(crate) mod greptime_builtin { Ok(ret.into()) } - fn try_scalar_to_value(scalar: &dyn Scalar, vm: &VirtualMachine) -> PyResult { - let ty_error = |s: String| vm.new_type_error(s); - scalar - .as_any() - .downcast_ref::>() - .ok_or_else(|| { - ty_error(format!( - "expect scalar to be i64, found{:?}", - scalar.data_type() - )) - })? - .value() - .ok_or_else(|| ty_error("All element is Null in a time series array".to_string())) - } - /// generate interval time point fn gen_inteveral( - oldest: &dyn Scalar, - newest: &dyn Scalar, + oldest: i64, + newest: i64, duration: i64, vm: &VirtualMachine, - ) -> PyResult>> { - use datatypes::arrow::datatypes::DataType; - match (oldest.data_type(), newest.data_type()) { - (DataType::Int64, DataType::Int64) => (), - _ => { - return Err(vm.new_type_error(format!( - "Expect int64, found {:?} and {:?}", - oldest.data_type(), - newest.data_type() - ))); - } - } - - let oldest = try_scalar_to_value(oldest, vm)?; - let newest = try_scalar_to_value(newest, vm)?; + ) -> PyResult> { if oldest > newest { return Err(vm.new_value_error(format!("{oldest} is greater than {newest}"))); } - let ret = if duration > 0 { - (oldest..=newest) + if duration > 0 { + let ret = (oldest..=newest) .step_by(duration as usize) - .map(|v| PrimitiveScalar::new(DataType::Int64, Some(v))) - .collect::>() + .collect::>(); + Ok(ret) } else { - return Err(vm.new_value_error(format!("duration: {duration} is not positive number."))); - }; - - Ok(ret) + Err(vm.new_value_error(format!("duration: {duration} is not positive number."))) + } } /// `func`: exec on sliding window slice of given `arr`, expect it to always return a PyVector of one element @@ -952,12 +925,19 @@ pub(crate) mod greptime_builtin { let arrow_error = |err: ArrowError| vm.new_runtime_error(format!("Arrow Error: {err:#?}")); let datatype_error = |err: datatypes::Error| vm.new_runtime_error(format!("DataType Errors!: {err:#?}")); - let ts: ArrayRef = ts.to_arrow_array(); - let arr: ArrayRef = arr.to_arrow_array(); + let ts_array_ref: ArrayRef = ts.to_arrow_array(); + let ts = ts_array_ref + .as_any() + .downcast_ref::() + .ok_or_else(|| { + vm.new_type_error(format!("ts must be int64, found: {:?}", ts_array_ref)) + })?; let slices = { - let oldest = compute::aggregate::min(&*ts).map_err(arrow_error)?; - let newest = compute::aggregate::max(&*ts).map_err(arrow_error)?; - gen_inteveral(&*oldest, &*newest, duration, vm)? + let oldest = aggregate::min(ts) + .ok_or_else(|| vm.new_runtime_error("ts must has min value".to_string()))?; + let newest = aggregate::max(ts) + .ok_or_else(|| vm.new_runtime_error("ts must has max value".to_string()))?; + gen_inteveral(oldest, newest, duration, vm)? }; let windows = { @@ -969,11 +949,15 @@ pub(crate) mod greptime_builtin { it }) .map(|(first, second)| { - compute::boolean::and(>_eq_scalar(&*ts, first), <_eq_scalar(&*ts, second)) - .map_err(arrow_error) + let left = comparison::gt_eq_scalar(ts, *first).map_err(arrow_error)?; + let right = comparison::lt_eq_scalar(ts, *second).map_err(arrow_error)?; + boolean::and(&left, &right).map_err(arrow_error) }) .map(|mask| match mask { - Ok(mask) => compute::filter::filter(&*arr, &mask).map_err(arrow_error), + Ok(mask) => { + let arrow_arr = arr.to_arrow_array(); + compute::filter(&arrow_arr, &mask).map_err(arrow_error) + } Err(e) => Err(e), }) .collect::, _>>()? @@ -1013,16 +997,17 @@ pub(crate) mod greptime_builtin { .map(apply_interval_function) .collect::, _>>()?; - // 3. get returen vector and concat them - let ret = fn_results - .into_iter() - .try_reduce(|acc, x| { - compute::concatenate::concatenate(&[acc.as_ref(), x.as_ref()]).map(Arc::from) - }) - .map_err(arrow_error)? - .unwrap_or_else(|| Arc::from(arr.slice(0, 0))); + // 3. get returned vector and concat them + let result_arrays: Vec<_> = fn_results + .iter() + .map(|vector| vector.to_arrow_array()) + .collect(); + let result_dyn_arrays: Vec<_> = result_arrays.iter().map(|v| v.as_ref()).collect(); + let concat_array = compute::concat(&result_dyn_arrays).map_err(arrow_error)?; + let vector = Helper::try_into_vector(concat_array).map_err(datatype_error)?; + // 4. return result vector - Ok(Helper::try_into_vector(ret).map_err(datatype_error)?.into()) + Ok(PyVector::from(vector)) } /// return first element in a `PyVector` in sliced new `PyVector`, if vector's length is zero, return a zero sized slice instead @@ -1033,7 +1018,7 @@ pub(crate) mod greptime_builtin { 0 => arr.slice(0, 0), _ => arr.slice(0, 1), }; - let ret = Helper::try_into_vector(&*ret).map_err(|e| { + let ret = Helper::try_into_vector(ret.clone()).map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", ret, e @@ -1050,7 +1035,7 @@ pub(crate) mod greptime_builtin { 0 => arr.slice(0, 0), _ => arr.slice(arr.len() - 1, 1), }; - let ret = Helper::try_into_vector(&*ret).map_err(|e| { + let ret = Helper::try_into_vector(ret.clone()).map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", ret, e diff --git a/src/script/src/python/builtins/test.rs b/src/script/src/python/builtins/test.rs index 8fdeb9ad94..52994f2b54 100644 --- a/src/script/src/python/builtins/test.rs +++ b/src/script/src/python/builtins/test.rs @@ -18,10 +18,10 @@ use std::io::Read; use std::path::Path; use std::sync::Arc; -use datatypes::arrow::array::{Float64Array, Int64Array, PrimitiveArray}; -use datatypes::arrow::compute::cast::CastOptions; -use datatypes::arrow::datatypes::DataType; -use datatypes::vectors::VectorRef; +use datatypes::arrow::array::{Float64Array, Int64Array}; +use datatypes::arrow::compute; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; +use datatypes::vectors::{Float64Vector, Int64Vector, VectorRef}; use ron::from_str as from_ron_string; use rustpython_vm::builtins::{PyFloat, PyInt, PyList}; use rustpython_vm::class::PyClassImpl; @@ -68,17 +68,17 @@ fn convert_scalar_to_py_obj_and_back() { panic!("Convert errors, expect 1") } let col = DFColValue::Scalar(ScalarValue::List( - Some(Box::new(vec![ + Some(vec![ ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2)), - ])), - Box::new(DataType::Int64), + ]), + Box::new(Field::new("item", ArrowDataType::Int64, false)), )); let to = try_into_py_obj(col, vm).unwrap(); let back = try_into_columnar_value(to, vm).unwrap(); - if let DFColValue::Scalar(ScalarValue::List(Some(list), ty)) = back { + if let DFColValue::Scalar(ScalarValue::List(Some(list), field)) = back { assert_eq!(list.len(), 2); - assert_eq!(ty.as_ref(), &DataType::Int64); + assert_eq!(*field.data_type(), ArrowDataType::Int64); } let list: Vec = vec![vm.ctx.new_int(1).into(), vm.ctx.new_int(2).into()]; let nested_list: Vec = @@ -92,12 +92,10 @@ fn convert_scalar_to_py_obj_and_back() { )); } - let list: PyVector = PyVector::from( - HelperVec::try_into_vector( - Arc::new(PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4])) as ArrayRef, - ) - .unwrap(), - ); + let list: PyVector = + PyVector::from( + Arc::new(Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4])) as VectorRef + ); let nested_list: Vec = vec![list.into_pyobject(vm), vm.ctx.new_int(3).into()]; let list_obj = vm.ctx.new_list(nested_list).into(); let expect_err = try_into_columnar_value(list_obj, vm); @@ -115,7 +113,7 @@ struct TestCase { #[derive(Debug, Serialize, Deserialize)] struct Var { value: PyValue, - ty: DataType, + ty: ArrowDataType, } /// for floating number comparison @@ -189,25 +187,25 @@ impl PyValue { } } -fn is_float(ty: &DataType) -> bool { +fn is_float(ty: &ArrowDataType) -> bool { matches!( ty, - DataType::Float16 | DataType::Float32 | DataType::Float64 + ArrowDataType::Float16 | ArrowDataType::Float32 | ArrowDataType::Float64 ) } /// unsigned included -fn is_int(ty: &DataType) -> bool { +fn is_int(ty: &ArrowDataType) -> bool { matches!( ty, - DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 + ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 ) } @@ -217,7 +215,7 @@ impl PyValue { PyValue::FloatVec(v) => { Arc::new(datatypes::vectors::Float64Vector::from_vec(v.clone())) } - PyValue::IntVec(v) => Arc::new(datatypes::vectors::Int64Vector::from_vec(v.clone())), + PyValue::IntVec(v) => Arc::new(Int64Vector::from_vec(v.clone())), PyValue::Int(v) => return Ok(vm.ctx.new_int(*v).into()), PyValue::Float(v) => return Ok(vm.ctx.new_float(*v).into()), Self::Bool(v) => return Ok(vm.ctx.new_bool(*v).into()), @@ -234,16 +232,9 @@ impl PyValue { let res = res.to_arrow_array(); let ty = res.data_type(); if is_float(ty) { - let vec_f64 = arrow::compute::cast::cast( - res.as_ref(), - &DataType::Float64, - CastOptions { - wrapped: true, - partial: true, - }, - ) - .map_err(|err| format!("{err:#?}"))?; - assert_eq!(vec_f64.data_type(), &DataType::Float64); + let vec_f64 = compute::cast(&res, &ArrowDataType::Float64) + .map_err(|err| format!("{err:#?}"))?; + assert_eq!(vec_f64.data_type(), &ArrowDataType::Float64); let vec_f64 = vec_f64 .as_any() .downcast_ref::() @@ -251,13 +242,6 @@ impl PyValue { let ret = vec_f64 .into_iter() .map(|v| v.map(|inner| inner.to_owned())) - /* .enumerate() - .map(|(idx, v)| { - v.ok_or(format!( - "No null element expected, found one in {idx} position" - )) - .map(|v| v.to_owned()) - })*/ .collect::>(); if ret.iter().all(|x| x.is_some()) { Ok(Self::FloatVec( @@ -267,16 +251,9 @@ impl PyValue { Ok(Self::FloatVecWithNull(ret)) } } else if is_int(ty) { - let vec_int = arrow::compute::cast::cast( - res.as_ref(), - &DataType::Int64, - CastOptions { - wrapped: true, - partial: true, - }, - ) - .map_err(|err| format!("{err:#?}"))?; - assert_eq!(vec_int.data_type(), &DataType::Int64); + let vec_int = compute::cast(&res, &ArrowDataType::Int64) + .map_err(|err| format!("{err:#?}"))?; + assert_eq!(vec_int.data_type(), &ArrowDataType::Int64); let vec_i64 = vec_int .as_any() .downcast_ref::() @@ -293,7 +270,7 @@ impl PyValue { .collect::>()?; Ok(Self::IntVec(ret)) } else { - Err(format!("unspupported DataType:{ty:#?}")) + Err(format!("unspupported ArrowDataType:{ty:#?}")) } } else if is_instance::(obj, vm) { let res = obj diff --git a/src/script/src/python/coprocessor.rs b/src/script/src/python/coprocessor.rs index 3bc5c39f2a..3dcc348562 100644 --- a/src/script/src/python/coprocessor.rs +++ b/src/script/src/python/coprocessor.rs @@ -16,19 +16,18 @@ pub mod compile; pub mod parse; use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::result::Result as StdResult; use std::sync::Arc; use common_recordbatch::RecordBatch; use common_telemetry::info; -use datafusion_common::record_batch::RecordBatch as DfRecordBatch; -use datatypes::arrow; -use datatypes::arrow::array::{Array, ArrayRef}; -use datatypes::arrow::compute::cast::CastOptions; -use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; -use datatypes::schema::Schema; -use datatypes::vectors::{BooleanVector, Helper, StringVector, Vector, VectorRef}; +use datatypes::arrow::array::Array; +use datatypes::arrow::compute; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::data_type::{ConcreteDataType, DataType}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::{Helper, VectorRef}; use rustpython_compiler_core::CodeObject; use rustpython_vm as vm; use rustpython_vm::class::PyClassImpl; @@ -43,7 +42,8 @@ use vm::{Interpreter, PyObjectRef, VirtualMachine}; use crate::python::builtins::greptime_builtin; use crate::python::coprocessor::parse::DecoratorArgs; use crate::python::error::{ - ensure, ret_other_error_with, ArrowSnafu, OtherSnafu, Result, TypeCastSnafu, + ensure, ret_other_error_with, ArrowSnafu, NewRecordBatchSnafu, OtherSnafu, Result, + TypeCastSnafu, }; use crate::python::utils::{format_py_error, is_instance, py_vec_obj_to_array}; use crate::python::PyVector; @@ -54,7 +54,8 @@ thread_local!(static INTERPRETER: RefCell>> = RefCell::n #[derive(Debug, Clone, PartialEq, Eq)] pub struct AnnotationInfo { /// if None, use types inferred by PyVector - pub datatype: Option, + // TODO(yingwen): We should use our data type. i.e. ConcreteDataType. + pub datatype: Option, pub is_nullable: bool, } @@ -95,7 +96,7 @@ impl Coprocessor { /// generate [`Schema`] according to return names, types, /// if no annotation /// the datatypes of the actual columns is used directly - fn gen_schema(&self, cols: &[ArrayRef]) -> Result> { + fn gen_schema(&self, cols: &[VectorRef]) -> Result { let names = &self.deco_args.ret_names; let anno = &self.return_types; ensure!( @@ -109,35 +110,38 @@ impl Coprocessor { ) } ); - Ok(Arc::new(ArrowSchema::from( - names - .iter() - .enumerate() - .map(|(idx, name)| { - let real_ty = cols[idx].data_type().to_owned(); - let AnnotationInfo { - datatype: ty, - is_nullable, - } = anno[idx].to_owned().unwrap_or_else(|| { - // default to be not nullable and use DataType inferred by PyVector itself - AnnotationInfo { - datatype: Some(real_ty.to_owned()), - is_nullable: false, - } - }); - Field::new( - name, - // if type is like `_` or `_ | None` - ty.unwrap_or(real_ty), - is_nullable, - ) - }) - .collect::>(), - ))) + + let column_schemas = names + .iter() + .enumerate() + .map(|(idx, name)| { + let real_ty = cols[idx].data_type(); + let AnnotationInfo { + datatype: ty, + is_nullable, + } = anno[idx].to_owned().unwrap_or_else(|| { + // default to be not nullable and use DataType inferred by PyVector itself + AnnotationInfo { + datatype: Some(real_ty.as_arrow_type()), + is_nullable: false, + } + }); + let column_type = match ty { + Some(arrow_type) => { + ConcreteDataType::try_from(&arrow_type).context(TypeCastSnafu)? + } + // if type is like `_` or `_ | None` + None => real_ty, + }; + Ok(ColumnSchema::new(name, column_type, is_nullable)) + }) + .collect::>>()?; + + Ok(Arc::new(Schema::new(column_schemas))) } /// check if real types and annotation types(if have) is the same, if not try cast columns to annotated type - fn check_and_cast_type(&self, cols: &mut [ArrayRef]) -> Result<()> { + fn check_and_cast_type(&self, cols: &mut [VectorRef]) -> Result<()> { let return_types = &self.return_types; // allow ignore Return Type Annotation if return_types.is_empty() { @@ -161,21 +165,10 @@ impl Coprocessor { { let real_ty = col.data_type(); let anno_ty = datatype; - if real_ty != anno_ty { - { - // This`CastOption` allow for overflowly cast and int to float loosely cast etc.., - // check its doc for more information - *col = arrow::compute::cast::cast( - col.as_ref(), - anno_ty, - CastOptions { - wrapped: true, - partial: true, - }, - ) - .context(ArrowSnafu)? - .into(); - } + if real_ty.as_arrow_type() != *anno_ty { + let array = col.to_arrow_array(); + let array = compute::cast(&array, anno_ty).context(ArrowSnafu)?; + *col = Helper::try_into_vector(array).context(TypeCastSnafu)?; } } } @@ -183,47 +176,6 @@ impl Coprocessor { } } -/// cast a `dyn Array` of type unsigned/int/float into a `dyn Vector` -fn try_into_vector(arg: Arc) -> Result> { - // wrap try_into_vector in here to convert `datatypes::error::Error` to `python::error::Error` - Helper::try_into_vector(arg).context(TypeCastSnafu) -} - -/// convert a `Vec` into a `Vec` only when they are of supported types -/// PyVector now only support unsigned&int8/16/32/64, float32/64 and bool when doing meanful arithmetics operation -fn try_into_py_vector(fetch_args: Vec) -> Result> { - let mut args: Vec = Vec::with_capacity(fetch_args.len()); - for (idx, arg) in fetch_args.into_iter().enumerate() { - let v: VectorRef = match arg.data_type() { - DataType::Float32 => try_into_vector::(arg)?, - DataType::Float64 => try_into_vector::(arg)?, - DataType::UInt8 => try_into_vector::(arg)?, - DataType::UInt16 => try_into_vector::(arg)?, - DataType::UInt32 => try_into_vector::(arg)?, - DataType::UInt64 => try_into_vector::(arg)?, - DataType::Int8 => try_into_vector::(arg)?, - DataType::Int16 => try_into_vector::(arg)?, - DataType::Int32 => try_into_vector::(arg)?, - DataType::Int64 => try_into_vector::(arg)?, - DataType::Utf8 => { - Arc::new(StringVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _ - } - DataType::Boolean => { - Arc::new(BooleanVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _ - } - _ => { - return ret_other_error_with(format!( - "Unsupported data type at column {idx}: {:?} for coprocessor", - arg.data_type() - )) - .fail() - } - }; - args.push(PyVector::from(v)); - } - Ok(args) -} - /// convert a tuple of `PyVector` or one `PyVector`(wrapped in a Python Object Ref[`PyObjectRef`]) /// to a `Vec` /// by default, a constant(int/float/bool) gives the a constant array of same length with input args @@ -231,7 +183,7 @@ fn try_into_columns( obj: &PyObjectRef, vm: &VirtualMachine, col_len: usize, -) -> Result> { +) -> Result> { if is_instance::(obj, vm) { let tuple = obj.payload::().with_context(|| { ret_other_error_with(format!("can't cast obj {:?} to PyTuple)", obj)) @@ -239,7 +191,7 @@ fn try_into_columns( let cols = tuple .iter() .map(|obj| py_vec_obj_to_array(obj, vm, col_len)) - .collect::>>()?; + .collect::>>()?; Ok(cols) } else { let col = py_vec_obj_to_array(obj, vm, col_len)?; @@ -249,27 +201,16 @@ fn try_into_columns( /// select columns according to `fetch_names` from `rb` /// and cast them into a Vec of PyVector -fn select_from_rb(rb: &DfRecordBatch, fetch_names: &[String]) -> Result> { - let field_map: HashMap<&String, usize> = rb - .schema() - .fields +fn select_from_rb(rb: &RecordBatch, fetch_names: &[String]) -> Result> { + fetch_names .iter() - .enumerate() - .map(|(idx, field)| (&field.name, idx)) - .collect(); - let fetch_idx: Vec = fetch_names - .iter() - .map(|field| { - field_map.get(field).copied().context(OtherSnafu { - reason: format!("Can't found field name {field}"), - }) + .map(|name| { + let vector = rb.column_by_name(name).with_context(|| OtherSnafu { + reason: format!("Can't find field name {}", name), + })?; + Ok(PyVector::from(vector.clone())) }) - .collect::>>()?; - let fetch_args: Vec> = fetch_idx - .into_iter() - .map(|idx| rb.column(idx).clone()) - .collect(); - try_into_py_vector(fetch_args) + .collect() } /// match between arguments' real type and annotation types @@ -277,12 +218,12 @@ fn select_from_rb(rb: &DfRecordBatch, fetch_names: &[String]) -> Result Result<()> { for (idx, arg) in args.iter().enumerate() { let anno_ty = copr.arg_types[idx].to_owned(); let real_ty = arg.to_arrow_array().data_type().to_owned(); - let is_nullable: bool = rb.schema().fields[idx].is_nullable; + let is_nullable: bool = rb.schema.column_schemas()[idx].is_nullable(); ensure!( anno_ty .to_owned() @@ -323,31 +264,32 @@ fn set_items_in_scope( /// The coprocessor function accept a python script and a Record Batch: /// ## What it does -/// 1. it take a python script and a [`DfRecordBatch`], extract columns and annotation info according to `args` given in decorator in python script +/// 1. it take a python script and a [`RecordBatch`], extract columns and annotation info according to `args` given in decorator in python script /// 2. execute python code and return a vector or a tuple of vector, -/// 3. the returning vector(s) is assembled into a new [`DfRecordBatch`] according to `returns` in python decorator and return to caller +/// 3. the returning vector(s) is assembled into a new [`RecordBatch`] according to `returns` in python decorator and return to caller /// /// # Example /// /// ```ignore /// use std::sync::Arc; -/// use datafusion_common::record_batch::RecordBatch as DfRecordBatch; -/// use datatypes::arrow::array::PrimitiveArray; -/// use datatypes::arrow::datatypes::{DataType, Field, Schema}; +/// use common_recordbatch::RecordBatch; +/// use datatypes::prelude::*; +/// use datatypes::schema::{ColumnSchema, Schema}; +/// use datatypes::vectors::{Float32Vector, Float64Vector}; /// use common_function::scalars::python::exec_coprocessor; /// let python_source = r#" /// @copr(args=["cpu", "mem"], returns=["perf", "what"]) /// def a(cpu, mem): /// return cpu + mem, cpu - mem /// "#; -/// let cpu_array = PrimitiveArray::from_slice([0.9f32, 0.8, 0.7, 0.6]); -/// let mem_array = PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4]); -/// let schema = Arc::new(Schema::from(vec![ -/// Field::new("cpu", DataType::Float32, false), -/// Field::new("mem", DataType::Float64, false), +/// let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.6]); +/// let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]); +/// let schema = Arc::new(Schema::new(vec![ +/// ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), false), +/// ColumnSchema::new("mem", ConcreteDataType::float64_datatype(), false), /// ])); /// let rb = -/// DfRecordBatch::try_new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap(); +/// RecordBatch::new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap(); /// let ret = exec_coprocessor(python_source, &rb).unwrap(); /// assert_eq!(ret.column(0).len(), 4); /// ``` @@ -357,7 +299,7 @@ fn set_items_in_scope( /// /// Currently support types are `u8`, `u16`, `u32`, `u64`, `i8`, `i16`, `i32`, `i64` and `f16`, `f32`, `f64` /// -/// use `f64 | None` to mark if returning column is nullable like in [`DfRecordBatch`]'s schema's [`Field`]'s is_nullable +/// use `f64 | None` to mark if returning column is nullable like in [`RecordBatch`]'s schema's [`ColumnSchema`]'s is_nullable /// /// you can also use single underscore `_` to let coprocessor infer what type it is, so `_` and `_ | None` are both valid in type annotation. /// Note: using `_` means not nullable column, using `_ | None` means nullable column @@ -373,7 +315,7 @@ fn set_items_in_scope( /// You can return constant in python code like `return 1, 1.0, True` /// which create a constant array(with same value)(currently support int, float and bool) as column on return #[cfg(test)] -pub fn exec_coprocessor(script: &str, rb: &DfRecordBatch) -> Result { +pub fn exec_coprocessor(script: &str, rb: &RecordBatch) -> Result { // 1. parse the script and check if it's only a function with `@coprocessor` decorator, and get `args` and `returns`, // 2. also check for exist of `args` in `rb`, if not found, return error // TODO(discord9): cache the result of parse_copr @@ -383,7 +325,7 @@ pub fn exec_coprocessor(script: &str, rb: &DfRecordBatch) -> Result pub(crate) fn exec_with_cached_vm( copr: &Coprocessor, - rb: &DfRecordBatch, + rb: &RecordBatch, args: Vec, vm: &Arc, ) -> Result { @@ -401,7 +343,7 @@ pub(crate) fn exec_with_cached_vm( // 5. get returns as either a PyVector or a PyTuple, and naming schema them according to `returns` let col_len = rb.num_rows(); - let mut cols: Vec = try_into_columns(&ret, vm, col_len)?; + let mut cols = try_into_columns(&ret, vm, col_len)?; ensure!( cols.len() == copr.deco_args.ret_names.len(), OtherSnafu { @@ -417,11 +359,7 @@ pub(crate) fn exec_with_cached_vm( copr.check_and_cast_type(&mut cols)?; // 6. return a assembled DfRecordBatch let schema = copr.gen_schema(&cols)?; - let res_rb = DfRecordBatch::try_new(schema.clone(), cols).context(ArrowSnafu)?; - Ok(RecordBatch { - schema: Arc::new(Schema::try_from(schema).context(TypeCastSnafu)?), - df_recordbatch: res_rb, - }) + RecordBatch::new(schema, cols).context(NewRecordBatchSnafu) }) } @@ -459,7 +397,7 @@ pub(crate) fn init_interpreter() -> Arc { } /// using a parsed `Coprocessor` struct as input to execute python code -pub(crate) fn exec_parsed(copr: &Coprocessor, rb: &DfRecordBatch) -> Result { +pub(crate) fn exec_parsed(copr: &Coprocessor, rb: &RecordBatch) -> Result { // 3. get args from `rb`, and cast them into PyVector let args: Vec = select_from_rb(rb, &copr.deco_args.arg_names)?; check_args_anno_real_type(&args, copr, rb)?; @@ -477,7 +415,7 @@ pub(crate) fn exec_parsed(copr: &Coprocessor, rb: &DfRecordBatch) -> Result StdResult { diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 7ad5390f7b..848bf71d8b 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -59,7 +59,7 @@ impl Stream for CoprStream { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Some(Ok(recordbatch))) => { - let batch = exec_parsed(&self.copr, &recordbatch.df_recordbatch) + let batch = exec_parsed(&self.copr, &recordbatch) .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -149,8 +149,8 @@ mod tests { use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::util; - use datafusion_common::field_util::{FieldExt, SchemaExt}; - use datatypes::arrow::array::{Float64Array, Int64Array}; + use datatypes::prelude::ScalarVector; + use datatypes::vectors::{Float64Vector, Int64Vector}; use query::QueryEngineFactory; use table::table::numbers::NumbersTable; @@ -177,6 +177,7 @@ mod tests { let script_engine = PyEngine::new(query_engine.clone()); + // To avoid divide by zero, the script divides `add(a, b)` by `g.sqrt(c + 1)` instead of `g.sqrt(c)` let script = r#" import greptime as g def add(a, b): @@ -184,7 +185,7 @@ def add(a, b): @copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100") def test(a, b, c): - return add(a, b) / g.sqrt(c) + return add(a, b) / g.sqrt(c + 1) "#; let script = script_engine .compile(script, CompileContext::default()) @@ -197,15 +198,18 @@ def test(a, b, c): assert_eq!(1, numbers.len()); let number = &numbers[0]; - assert_eq!(number.df_recordbatch.num_columns(), 1); - assert_eq!("r", number.schema.arrow_schema().field(0).name()); + assert_eq!(number.num_columns(), 1); + assert_eq!("r", number.schema.column_schemas()[0].name); - let columns = number.df_recordbatch.columns(); - assert_eq!(1, columns.len()); - assert_eq!(100, columns[0].len()); - let rows = columns[0].as_any().downcast_ref::().unwrap(); - assert!(rows.value(0).is_nan()); - assert_eq!((99f64 + 99f64) / 99f64.sqrt(), rows.value(99)) + assert_eq!(1, number.num_columns()); + assert_eq!(100, number.column(0).len()); + let rows = number + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(0f64, rows.get_data(0).unwrap()); + assert_eq!((99f64 + 99f64) / 100f64.sqrt(), rows.get_data(99).unwrap()) } _ => unreachable!(), } @@ -229,15 +233,18 @@ def test(a): assert_eq!(1, numbers.len()); let number = &numbers[0]; - assert_eq!(number.df_recordbatch.num_columns(), 1); - assert_eq!("r", number.schema.arrow_schema().field(0).name()); + assert_eq!(number.num_columns(), 1); + assert_eq!("r", number.schema.column_schemas()[0].name); - let columns = number.df_recordbatch.columns(); - assert_eq!(1, columns.len()); - assert_eq!(50, columns[0].len()); - let rows = columns[0].as_any().downcast_ref::().unwrap(); - assert_eq!(0, rows.value(0)); - assert_eq!(98, rows.value(49)) + assert_eq!(1, number.num_columns()); + assert_eq!(50, number.column(0).len()); + let rows = number + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(0, rows.get_data(0).unwrap()); + assert_eq!(98, rows.get_data(49).unwrap()) } _ => unreachable!(), } diff --git a/src/script/src/python/error.rs b/src/script/src/python/error.rs index 9a77984149..6e20e86db0 100644 --- a/src/script/src/python/error.rs +++ b/src/script/src/python/error.rs @@ -105,6 +105,12 @@ pub enum Error { #[snafu(backtrace)] source: common_recordbatch::error::Error, }, + + #[snafu(display("Failed to create record batch, source: {}", source))] + NewRecordBatch { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, } impl From for Error { @@ -121,7 +127,9 @@ impl ErrorExt for Error { | Error::PyRuntime { .. } | Error::Other { .. } => StatusCode::Internal, - Error::RecordBatch { source } => source.status_code(), + Error::RecordBatch { source } | Error::NewRecordBatch { source } => { + source.status_code() + } Error::DatabaseQuery { source } => source.status_code(), Error::TypeCast { source } => source.status_code(), diff --git a/src/script/src/python/test.rs b/src/script/src/python/test.rs index 5790ce281c..2f01c5c402 100644 --- a/src/script/src/python/test.rs +++ b/src/script/src/python/test.rs @@ -20,10 +20,12 @@ use std::io::prelude::*; use std::path::Path; use std::sync::Arc; +use common_recordbatch::RecordBatch; use console::style; -use datafusion_common::record_batch::RecordBatch as DfRecordBatch; -use datatypes::arrow::array::PrimitiveArray; -use datatypes::arrow::datatypes::{DataType, Field, Schema}; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::data_type::{ConcreteDataType, DataType}; +use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::vectors::{Float32Vector, Float64Vector, Int64Vector, VectorRef}; use ron::from_str as from_ron_string; use rustpython_parser::parser; use serde::{Deserialize, Serialize}; @@ -62,19 +64,26 @@ enum Predicate { #[derive(Serialize, Deserialize, Debug)] struct ColumnInfo { - pub ty: DataType, + pub ty: ArrowDataType, pub len: usize, } -fn create_sample_recordbatch() -> DfRecordBatch { - let cpu_array = PrimitiveArray::from_slice([0.9f32, 0.8, 0.7, 0.6]); - let mem_array = PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4]); - let schema = Arc::new(Schema::from(vec![ - Field::new("cpu", DataType::Float32, false), - Field::new("mem", DataType::Float64, false), +fn create_sample_recordbatch() -> RecordBatch { + let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.6]); + let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]); + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), false), + ColumnSchema::new("mem", ConcreteDataType::float64_datatype(), false), ])); - DfRecordBatch::try_new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap() + RecordBatch::new( + schema, + [ + Arc::new(cpu_array) as VectorRef, + Arc::new(mem_array) as VectorRef, + ], + ) + .unwrap() } /// test cases which read from a .ron file, deser, @@ -102,8 +111,7 @@ fn run_ron_testcases() { Predicate::ParseIsErr { reason } => { let copr = parse_and_compile_copr(&testcase.code); if copr.is_ok() { - eprintln!("Expect to be err, found{copr:#?}"); - panic!() + panic!("Expect to be err, found{copr:#?}"); } let res = &copr.unwrap_err(); println!( @@ -113,24 +121,18 @@ fn run_ron_testcases() { let (res, _) = get_error_reason_loc(res); if !res.contains(&reason) { eprintln!("{}", testcase.code); - eprintln!("Parse Error, expect \"{reason}\" in \"{res}\", but not found."); - panic!() + panic!("Parse Error, expect \"{reason}\" in \"{res}\", but not found."); } } Predicate::ExecIsOk { fields, columns } => { let rb = create_sample_recordbatch(); - let res = coprocessor::exec_coprocessor(&testcase.code, &rb); - if res.is_err() { - dbg!(&res); - } - assert!(res.is_ok()); - let res = res.unwrap(); + let res = coprocessor::exec_coprocessor(&testcase.code, &rb).unwrap(); fields .iter() - .zip(&res.schema.arrow_schema().fields) + .zip(res.schema.column_schemas()) .map(|(anno, real)| { - if !(anno.datatype.clone().unwrap() == real.data_type - && anno.is_nullable == real.is_nullable) + if !(anno.datatype.as_ref().unwrap() == &real.data_type.as_arrow_type() + && anno.is_nullable == real.is_nullable()) { eprintln!("fields expect to be {anno:#?}, found to be {real:#?}."); panic!() @@ -139,17 +141,18 @@ fn run_ron_testcases() { .count(); columns .iter() - .zip(res.df_recordbatch.columns()) - .map(|(anno, real)| { - if !(&anno.ty == real.data_type() && anno.len == real.len()) { - eprintln!( + .enumerate() + .map(|(i, anno)| { + let real = res.column(i); + if !(anno.ty == real.data_type().as_arrow_type() && anno.len == real.len()) + { + panic!( "Unmatch type or length!Expect [{:#?}; {}], found [{:#?}; {}]", anno.ty, anno.len, real.data_type(), real.len() ); - panic!() } }) .count(); @@ -166,17 +169,15 @@ fn run_ron_testcases() { ); let (reason, _) = get_error_reason_loc(&res); if !reason.contains(&part_reason) { - eprintln!( + panic!( "{}\nExecute error, expect \"{reason}\" in \"{res}\", but not found.", testcase.code, reason = style(reason).green(), res = style(res).red() ); - panic!() } } else { - eprintln!("{:#?}\nExpect Err(...), found Ok(...)", res); - panic!(); + panic!("{:#?}\nExpect Err(...), found Ok(...)", res); } } } @@ -235,7 +236,7 @@ def calc_rvs(open_time, close): rv_180d = vector([calc_rv(close, open_time, timepoint, datetime("180d"))]) return rv_7d, rv_15d, rv_30d, rv_60d, rv_90d, rv_180d "#; - let close_array = PrimitiveArray::from_slice([ + let close_array = Float32Vector::from_slice([ 10106.79f32, 10106.09, 10108.73, @@ -248,17 +249,20 @@ def calc_rvs(open_time, close): 10117.08, 10120.43, ]); - let open_time_array = PrimitiveArray::from_slice([ + let open_time_array = Int64Vector::from_slice([ 300i64, 900i64, 1200i64, 1800i64, 2400i64, 3000i64, 3600i64, 4200i64, 4800i64, 5400i64, 6000i64, ]); - let schema = Arc::new(Schema::from(vec![ - Field::new("close", DataType::Float32, false), - Field::new("open_time", DataType::Int64, false), + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("close", ConcreteDataType::float32_datatype(), false), + ColumnSchema::new("open_time", ConcreteDataType::int64_datatype(), false), ])); - let rb = DfRecordBatch::try_new( + let rb = RecordBatch::new( schema, - vec![Arc::new(close_array), Arc::new(open_time_array)], + [ + Arc::new(close_array) as VectorRef, + Arc::new(open_time_array) as VectorRef, + ], ) .unwrap(); let ret = coprocessor::exec_coprocessor(python_source, &rb); @@ -297,14 +301,20 @@ def a(cpu, mem): ref = log2(fed/prev(fed)) return (0.5 < cpu) & ~( cpu >= 0.75) "#; - let cpu_array = PrimitiveArray::from_slice([0.9f32, 0.8, 0.7, 0.3]); - let mem_array = PrimitiveArray::from_slice([0.1f64, 0.2, 0.3, 0.4]); - let schema = Arc::new(Schema::from(vec![ - Field::new("cpu", DataType::Float32, false), - Field::new("mem", DataType::Float64, false), + let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]); + let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]); + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), false), + ColumnSchema::new("mem", ConcreteDataType::float64_datatype(), false), ])); - let rb = - DfRecordBatch::try_new(schema, vec![Arc::new(cpu_array), Arc::new(mem_array)]).unwrap(); + let rb = RecordBatch::new( + schema, + [ + Arc::new(cpu_array) as VectorRef, + Arc::new(mem_array) as VectorRef, + ], + ) + .unwrap(); let ret = coprocessor::exec_coprocessor(python_source, &rb); if let Err(Error::PyParse { backtrace: _, diff --git a/src/script/src/python/utils.rs b/src/script/src/python/utils.rs index fcc0bf3956..8f078c163c 100644 --- a/src/script/src/python/utils.rs +++ b/src/script/src/python/utils.rs @@ -14,10 +14,12 @@ use std::sync::Arc; -use datafusion::arrow::array::{ArrayRef, BooleanArray, NullArray, PrimitiveArray, Utf8Array}; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue as DFColValue; -use datatypes::arrow::datatypes::DataType; +use datatypes::prelude::ScalarVector; +use datatypes::vectors::{ + BooleanVector, Float64Vector, Helper, Int64Vector, NullVector, StringVector, VectorRef, +}; use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr}; use rustpython_vm::{PyObjectRef, PyPayload, PyRef, VirtualMachine}; use snafu::{Backtrace, GenerateImplicitData, OptionExt, ResultExt}; @@ -54,26 +56,26 @@ pub fn py_vec_obj_to_array( obj: &PyObjectRef, vm: &VirtualMachine, col_len: usize, -) -> Result { +) -> Result { // It's ugly, but we can't find a better way right now. if is_instance::(obj, vm) { let pyv = obj.payload::().with_context(|| { ret_other_error_with(format!("can't cast obj {:?} to PyVector", obj)) })?; - Ok(pyv.to_arrow_array()) + Ok(pyv.as_vector_ref()) } else if is_instance::(obj, vm) { let val = obj .to_owned() .try_into_value::(vm) .map_err(|e| format_py_error(e, vm))?; - let ret = PrimitiveArray::from_vec(vec![val; col_len]); + let ret = Int64Vector::from_iterator(std::iter::repeat(val).take(col_len)); Ok(Arc::new(ret) as _) } else if is_instance::(obj, vm) { let val = obj .to_owned() .try_into_value::(vm) .map_err(|e| format_py_error(e, vm))?; - let ret = PrimitiveArray::from_vec(vec![val; col_len]); + let ret = Float64Vector::from_iterator(std::iter::repeat(val).take(col_len)); Ok(Arc::new(ret) as _) } else if is_instance::(obj, vm) { let val = obj @@ -81,7 +83,7 @@ pub fn py_vec_obj_to_array( .try_into_value::(vm) .map_err(|e| format_py_error(e, vm))?; - let ret = BooleanArray::from_iter(std::iter::repeat(Some(val)).take(col_len)); + let ret = BooleanVector::from_iterator(std::iter::repeat(val).take(col_len)); Ok(Arc::new(ret) as _) } else if is_instance::(obj, vm) { let val = obj @@ -89,7 +91,7 @@ pub fn py_vec_obj_to_array( .try_into_value::(vm) .map_err(|e| format_py_error(e, vm))?; - let ret = Utf8Array::::from_iter(std::iter::repeat(Some(val)).take(col_len)); + let ret = StringVector::from_iterator(std::iter::repeat(val.as_str()).take(col_len)); Ok(Arc::new(ret) as _) } else if is_instance::(obj, vm) { let columnar_value = @@ -101,9 +103,9 @@ pub fn py_vec_obj_to_array( let array = ScalarValue::iter_to_array(scalars.into_iter()) .context(error::DataFusionSnafu)?; - Ok(array) + Helper::try_into_vector(array).context(error::TypeCastSnafu) } - None => Ok(Arc::new(NullArray::new(DataType::Null, 0))), + None => Ok(Arc::new(NullVector::new(0))), }, _ => unreachable!(), } diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index 91bfd1a6ed..35fa41ba52 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -19,17 +19,17 @@ use std::sync::Arc; use common_time::date::Date; use common_time::datetime::DateTime; use common_time::timestamp::Timestamp; -use datatypes::arrow::array::{Array, ArrayRef, BooleanArray, PrimitiveArray}; +use datatypes::arrow::array::{ + Array, ArrayRef, BooleanArray, Float64Array, Int64Array, UInt64Array, +}; use datatypes::arrow::compute; -use datatypes::arrow::compute::cast::{self, CastOptions}; -use datatypes::arrow::compute::{arithmetics, comparison}; -use datatypes::arrow::datatypes::DataType; -use datatypes::arrow::scalar::{PrimitiveScalar, Scalar}; -use datatypes::data_type::ConcreteDataType; +use datatypes::arrow::compute::kernels::{arithmetic, boolean, comparison}; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::arrow::error::Result as ArrowResult; +use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::Value; -use datatypes::value::OrderedFloat; -use datatypes::vectors::{Helper, NullVector, VectorBuilder, VectorRef}; -use datatypes::{arrow, value}; +use datatypes::value::{self, OrderedFloat}; +use datatypes::vectors::{Helper, NullVector, VectorRef}; use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyBytes, PyFloat, PyInt, PyNone, PyStr}; use rustpython_vm::function::{Either, OptionalArg, PyComparisonValue}; use rustpython_vm::protocol::{PyMappingMethods, PySequenceMethods}; @@ -55,120 +55,71 @@ impl From for PyVector { fn emit_cast_error( vm: &VirtualMachine, - src_ty: &DataType, - dst_ty: &DataType, + src_ty: &ArrowDataType, + dst_ty: &ArrowDataType, ) -> PyBaseExceptionRef { vm.new_type_error(format!( "Can't cast source operand of type {:?} into target type of {:?}", src_ty, dst_ty )) } -fn arrow2_rsub_scalar( - arr: &dyn Array, - val: &dyn Scalar, - _vm: &VirtualMachine, -) -> PyResult> { - // b - a => a * (-1) + b - let neg = arithmetics::mul_scalar(arr, &PrimitiveScalar::new(DataType::Int64, Some(-1i64))); - Ok(arithmetics::add_scalar(neg.as_ref(), val)) + +/// Performs `val - arr`. +fn arrow_rsub(arr: &dyn Array, val: &dyn Array, vm: &VirtualMachine) -> PyResult { + arithmetic::subtract_dyn(val, arr).map_err(|e| vm.new_type_error(format!("rsub error: {}", e))) } -fn arrow2_rtruediv_scalar( - arr: &dyn Array, - val: &dyn Scalar, - vm: &VirtualMachine, -) -> PyResult> { - // val / arr => one_arr / arr * val (this is simpler to write) - let one_arr: Box = if is_float(arr.data_type()) { - Box::new(PrimitiveArray::from_values(vec![1f64; arr.len()])) - } else if is_integer(arr.data_type()) { - Box::new(PrimitiveArray::from_values(vec![1i64; arr.len()])) - } else { - return Err(vm.new_not_implemented_error(format!( - "truediv of {:?} Scalar with {:?} Array is not supported", - val.data_type(), - arr.data_type() - ))); - }; - let tmp = arithmetics::mul_scalar(one_arr.as_ref(), val); - Ok(arithmetics::div(tmp.as_ref(), arr)) +/// Performs `val / arr` +fn arrow_rtruediv(arr: &dyn Array, val: &dyn Array, vm: &VirtualMachine) -> PyResult { + arithmetic::divide_dyn(val, arr) + .map_err(|e| vm.new_type_error(format!("rtruediv error: {}", e))) } -fn arrow2_rfloordiv_scalar( - arr: &dyn Array, - val: &dyn Scalar, - vm: &VirtualMachine, -) -> PyResult> { - // val // arr => one_arr // arr * val (this is simpler to write) - let one_arr: Box = if is_float(arr.data_type()) { - Box::new(PrimitiveArray::from_values(vec![1f64; arr.len()])) - } else if is_integer(arr.data_type()) { - Box::new(PrimitiveArray::from_values(vec![1i64; arr.len()])) - } else { - return Err(vm.new_not_implemented_error(format!( - "truediv of {:?} Scalar with {:?} Array is not supported", - val.data_type(), - arr.data_type() - ))); - }; - let tmp = arithmetics::mul_scalar(one_arr.as_ref(), val); - - Ok(arrow::compute::cast::cast( - arithmetics::div(tmp.as_ref(), arr).as_ref(), - &DataType::Int64, - cast::CastOptions { - wrapped: false, - partial: true, - }, - ) - .unwrap()) +/// Performs `val / arr`, but cast to i64. +fn arrow_rfloordiv(arr: &dyn Array, val: &dyn Array, vm: &VirtualMachine) -> PyResult { + let array = arithmetic::divide_dyn(val, arr) + .map_err(|e| vm.new_type_error(format!("rtruediv divide error: {}", e)))?; + compute::cast(&array, &ArrowDataType::Int64) + .map_err(|e| vm.new_type_error(format!("rtruediv cast error: {}", e))) } -fn wrap_result( - f: F, -) -> impl Fn(&dyn Array, &dyn Scalar, &VirtualMachine) -> PyResult> +fn wrap_result(f: F) -> impl Fn(&dyn Array, &dyn Array, &VirtualMachine) -> PyResult where - F: Fn(&dyn Array, &dyn Scalar) -> Box, + F: Fn(&dyn Array, &dyn Array) -> ArrowResult, { - move |left, right, _vm| Ok(f(left, right)) + move |left, right, vm| { + f(left, right).map_err(|e| vm.new_type_error(format!("arithmetic error {}", e))) + } } -fn is_float(datatype: &DataType) -> bool { +fn is_float(datatype: &ArrowDataType) -> bool { matches!( datatype, - DataType::Float16 | DataType::Float32 | DataType::Float64 + ArrowDataType::Float16 | ArrowDataType::Float32 | ArrowDataType::Float64 ) } -fn is_integer(datatype: &DataType) -> bool { - is_signed(datatype) || is_unsigned(datatype) -} - -fn is_signed(datatype: &DataType) -> bool { +fn is_signed(datatype: &ArrowDataType) -> bool { matches!( datatype, - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 + ArrowDataType::Int8 | ArrowDataType::Int16 | ArrowDataType::Int32 | ArrowDataType::Int64 ) } -fn is_unsigned(datatype: &DataType) -> bool { +fn is_unsigned(datatype: &ArrowDataType) -> bool { matches!( datatype, - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 + ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 ) } -fn cast(array: ArrayRef, target_type: &DataType, vm: &VirtualMachine) -> PyResult> { - cast::cast( - array.as_ref(), - target_type, - CastOptions { - wrapped: true, - partial: true, - }, - ) - .map_err(|e| vm.new_type_error(e.to_string())) +fn cast(array: ArrayRef, target_type: &ArrowDataType, vm: &VirtualMachine) -> PyResult { + compute::cast(&array, target_type).map_err(|e| vm.new_type_error(e.to_string())) } + fn from_debug_error(err: impl std::fmt::Debug, vm: &VirtualMachine) -> PyBaseExceptionRef { vm.new_runtime_error(format!("Runtime Error: {err:#?}")) } @@ -194,7 +145,7 @@ impl PyVector { } let datatype = get_concrete_type(&elements[0], vm)?; - let mut buf = VectorBuilder::with_capacity(datatype.clone(), elements.len()); + let mut buf = datatype.create_mutable_vector(elements.len()); for obj in elements.drain(..) { let val = if let Some(v) = @@ -207,11 +158,12 @@ impl PyVector { obj, datatype ))); }; - buf.push(&val); + // Safety: `pyobj_try_to_typed_val()` has checked the data type. + buf.push_value_ref(val.as_value_ref()).unwrap(); } Ok(PyVector { - vector: buf.finish(), + vector: buf.to_vector(), }) } else { Ok(PyVector::default()) @@ -232,23 +184,26 @@ impl PyVector { fn scalar_arith_op( &self, other: PyObjectRef, - target_type: Option, + target_type: Option, op: F, vm: &VirtualMachine, ) -> PyResult where - F: Fn(&dyn Array, &dyn Scalar, &VirtualMachine) -> PyResult>, + F: Fn(&dyn Array, &dyn Array, &VirtualMachine) -> PyResult, { // the right operand only support PyInt or PyFloat, let (right, right_type) = { if is_instance::(&other, vm) { other .try_into_value::(vm) - .map(|v| (value::Value::Int64(v), DataType::Int64))? + .map(|v| (value::Value::Int64(v), ArrowDataType::Int64))? } else if is_instance::(&other, vm) { - other - .try_into_value::(vm) - .map(|v| (value::Value::Float64(OrderedFloat(v)), DataType::Float64))? + other.try_into_value::(vm).map(|v| { + ( + value::Value::Float64(OrderedFloat(v)), + ArrowDataType::Float64, + ) + })? } else { return Err(vm.new_type_error(format!( "Can't cast right operand into Scalar of Int or Float, actual: {}", @@ -264,45 +219,38 @@ impl PyVector { // TODO(discord9): found better way to cast between signed and unsigned type let target_type = target_type.unwrap_or_else(|| { if is_signed(left_type) && is_signed(right_type) { - DataType::Int64 + ArrowDataType::Int64 } else if is_unsigned(left_type) && is_unsigned(right_type) { - DataType::UInt64 + ArrowDataType::UInt64 } else { - DataType::Float64 + ArrowDataType::Float64 } }); let left = cast(left, &target_type, vm)?; - let right: Box = if is_float(&target_type) { + let left_len = left.len(); + + // Convert `right` to an array of `target_type`. + let right: Box = if is_float(&target_type) { match right { - value::Value::Int64(v) => { - Box::new(PrimitiveScalar::new(target_type, Some(v as f64))) - } - value::Value::UInt64(v) => { - Box::new(PrimitiveScalar::new(target_type, Some(v as f64))) - } + value::Value::Int64(v) => Box::new(Float64Array::from_value(v as f64, left_len)), + value::Value::UInt64(v) => Box::new(Float64Array::from_value(v as f64, left_len)), value::Value::Float64(v) => { - Box::new(PrimitiveScalar::new(target_type, Some(f64::from(v)))) + Box::new(Float64Array::from_value(f64::from(v), left_len)) } _ => unreachable!(), } } else if is_signed(&target_type) { match right { - value::Value::Int64(v) => Box::new(PrimitiveScalar::new(target_type, Some(v))), - value::Value::UInt64(v) => { - Box::new(PrimitiveScalar::new(target_type, Some(v as i64))) - } - value::Value::Float64(v) => { - Box::new(PrimitiveScalar::new(DataType::Float64, Some(v.0 as i64))) - } + value::Value::Int64(v) => Box::new(Int64Array::from_value(v, left_len)), + value::Value::UInt64(v) => Box::new(Int64Array::from_value(v as i64, left_len)), + value::Value::Float64(v) => Box::new(Int64Array::from_value(v.0 as i64, left_len)), _ => unreachable!(), } } else if is_unsigned(&target_type) { match right { - value::Value::Int64(v) => Box::new(PrimitiveScalar::new(target_type, Some(v))), - value::Value::UInt64(v) => Box::new(PrimitiveScalar::new(target_type, Some(v))), - value::Value::Float64(v) => { - Box::new(PrimitiveScalar::new(target_type, Some(f64::from(v)))) - } + value::Value::Int64(v) => Box::new(UInt64Array::from_value(v as u64, left_len)), + value::Value::UInt64(v) => Box::new(UInt64Array::from_value(v, left_len)), + value::Value::Float64(v) => Box::new(UInt64Array::from_value(v.0 as u64, left_len)), _ => unreachable!(), } } else { @@ -311,7 +259,7 @@ impl PyVector { let result = op(left.as_ref(), right.as_ref(), vm)?; - Ok(Helper::try_into_vector(&*result) + Ok(Helper::try_into_vector(result.clone()) .map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", @@ -324,12 +272,12 @@ impl PyVector { fn arith_op( &self, other: PyObjectRef, - target_type: Option, + target_type: Option, op: F, vm: &VirtualMachine, ) -> PyResult where - F: Fn(&dyn Array, &dyn Array) -> Box, + F: Fn(&dyn Array, &dyn Array) -> ArrowResult, { let right = other.downcast_ref::().ok_or_else(|| { vm.new_type_error(format!( @@ -345,20 +293,21 @@ impl PyVector { let target_type = target_type.unwrap_or_else(|| { if is_signed(left_type) && is_signed(right_type) { - DataType::Int64 + ArrowDataType::Int64 } else if is_unsigned(left_type) && is_unsigned(right_type) { - DataType::UInt64 + ArrowDataType::UInt64 } else { - DataType::Float64 + ArrowDataType::Float64 } }); let left = cast(left, &target_type, vm)?; let right = cast(right, &target_type, vm)?; - let result = op(left.as_ref(), right.as_ref()); + let result = op(left.as_ref(), right.as_ref()) + .map_err(|e| vm.new_type_error(format!("Can't compute op, error: {}", e)))?; - Ok(Helper::try_into_vector(&*result) + Ok(Helper::try_into_vector(result.clone()) .map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", @@ -372,27 +321,27 @@ impl PyVector { #[pymethod(magic)] fn add(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult { if is_pyobj_scalar(&other, vm) { - self.scalar_arith_op(other, None, wrap_result(arithmetics::add_scalar), vm) + self.scalar_arith_op(other, None, wrap_result(arithmetic::add_dyn), vm) } else { - self.arith_op(other, None, arithmetics::add, vm) + self.arith_op(other, None, arithmetic::add_dyn, vm) } } #[pymethod(magic)] fn sub(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult { if is_pyobj_scalar(&other, vm) { - self.scalar_arith_op(other, None, wrap_result(arithmetics::sub_scalar), vm) + self.scalar_arith_op(other, None, wrap_result(arithmetic::subtract_dyn), vm) } else { - self.arith_op(other, None, arithmetics::sub, vm) + self.arith_op(other, None, arithmetic::subtract_dyn, vm) } } #[pymethod(magic)] fn rsub(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult { if is_pyobj_scalar(&other, vm) { - self.scalar_arith_op(other, None, arrow2_rsub_scalar, vm) + self.scalar_arith_op(other, None, arrow_rsub, vm) } else { - self.arith_op(other, None, |a, b| arithmetics::sub(b, a), vm) + self.arith_op(other, None, |a, b| arithmetic::subtract_dyn(b, a), vm) } } @@ -400,9 +349,9 @@ impl PyVector { #[pymethod(magic)] fn mul(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult { if is_pyobj_scalar(&other, vm) { - self.scalar_arith_op(other, None, wrap_result(arithmetics::mul_scalar), vm) + self.scalar_arith_op(other, None, wrap_result(arithmetic::multiply_dyn), vm) } else { - self.arith_op(other, None, arithmetics::mul, vm) + self.arith_op(other, None, arithmetic::multiply_dyn, vm) } } @@ -411,24 +360,29 @@ impl PyVector { if is_pyobj_scalar(&other, vm) { self.scalar_arith_op( other, - Some(DataType::Float64), - wrap_result(arithmetics::div_scalar), + Some(ArrowDataType::Float64), + wrap_result(arithmetic::divide_dyn), vm, ) } else { - self.arith_op(other, Some(DataType::Float64), arithmetics::div, vm) + self.arith_op( + other, + Some(ArrowDataType::Float64), + arithmetic::divide_dyn, + vm, + ) } } #[pymethod(magic)] fn rtruediv(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult { if is_pyobj_scalar(&other, vm) { - self.scalar_arith_op(other, Some(DataType::Float64), arrow2_rtruediv_scalar, vm) + self.scalar_arith_op(other, Some(ArrowDataType::Float64), arrow_rtruediv, vm) } else { self.arith_op( other, - Some(DataType::Float64), - |a, b| arithmetics::div(b, a), + Some(ArrowDataType::Float64), + |a, b| arithmetic::divide_dyn(b, a), vm, ) } @@ -439,12 +393,17 @@ impl PyVector { if is_pyobj_scalar(&other, vm) { self.scalar_arith_op( other, - Some(DataType::Int64), - wrap_result(arithmetics::div_scalar), + Some(ArrowDataType::Int64), + wrap_result(arithmetic::divide_dyn), vm, ) } else { - self.arith_op(other, Some(DataType::Int64), arithmetics::div, vm) + self.arith_op( + other, + Some(ArrowDataType::Int64), + arithmetic::divide_dyn, + vm, + ) } } @@ -452,12 +411,12 @@ impl PyVector { fn rfloordiv(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult { if is_pyobj_scalar(&other, vm) { // FIXME: DataType convert problem, target_type should be inferred? - self.scalar_arith_op(other, Some(DataType::Int64), arrow2_rfloordiv_scalar, vm) + self.scalar_arith_op(other, Some(ArrowDataType::Int64), arrow_rfloordiv, vm) } else { self.arith_op( other, - Some(DataType::Int64), - |a, b| arithmetics::div(b, a), + Some(ArrowDataType::Int64), + |a, b| arithmetic::divide_dyn(b, a), vm, ) } @@ -533,9 +492,9 @@ impl PyVector { .as_any() .downcast_ref::() .ok_or_else(|| vm.new_type_error(format!("Can't cast {left:#?} as a Boolean Array")))?; - let res = compute::boolean::and(left, right).map_err(|err| from_debug_error(err, vm))?; + let res = boolean::and(left, right).map_err(|err| from_debug_error(err, vm))?; let res = Arc::new(res) as ArrayRef; - let ret = Helper::try_into_vector(&*res).map_err(|err| from_debug_error(err, vm))?; + let ret = Helper::try_into_vector(res.clone()).map_err(|err| from_debug_error(err, vm))?; Ok(ret.into()) } @@ -551,9 +510,9 @@ impl PyVector { .as_any() .downcast_ref::() .ok_or_else(|| vm.new_type_error(format!("Can't cast {left:#?} as a Boolean Array")))?; - let res = compute::boolean::or(left, right).map_err(|err| from_debug_error(err, vm))?; + let res = boolean::or(left, right).map_err(|err| from_debug_error(err, vm))?; let res = Arc::new(res) as ArrayRef; - let ret = Helper::try_into_vector(&*res).map_err(|err| from_debug_error(err, vm))?; + let ret = Helper::try_into_vector(res.clone()).map_err(|err| from_debug_error(err, vm))?; Ok(ret.into()) } @@ -565,9 +524,9 @@ impl PyVector { .as_any() .downcast_ref::() .ok_or_else(|| vm.new_type_error(format!("Can't cast {left:#?} as a Boolean Array")))?; - let res = compute::boolean::not(left); + let res = boolean::not(left).map_err(|err| from_debug_error(err, vm))?; let res = Arc::new(res) as ArrayRef; - let ret = Helper::try_into_vector(&*res).map_err(|err| from_debug_error(err, vm))?; + let ret = Helper::try_into_vector(res.clone()).map_err(|err| from_debug_error(err, vm))?; Ok(ret.into()) } @@ -580,15 +539,15 @@ impl PyVector { #[pymethod(name = "filter")] fn filter(&self, other: PyVectorRef, vm: &VirtualMachine) -> PyResult { let left = self.to_arrow_array(); - let right: ArrayRef = other.to_arrow_array(); + let right = other.to_arrow_array(); let filter = right.as_any().downcast_ref::(); match filter { Some(filter) => { - let res = compute::filter::filter(left.as_ref(), filter); + let res = compute::filter(left.as_ref(), filter); let res = res.map_err(|err| vm.new_runtime_error(format!("Arrow Error: {err:#?}")))?; - let ret = Helper::try_into_vector(&*res).map_err(|e| { + let ret = Helper::try_into_vector(res.clone()).map_err(|e| { vm.new_type_error(format!( "Can't cast result into vector, result: {:?}, err: {:?}", res, e @@ -618,14 +577,10 @@ impl PyVector { .ok_or_else(|| { vm.new_type_error(format!("Can't cast {seq:#?} as a Boolean Array")) })?; - // let left = self.to_arrow_array(); - let res = compute::filter::filter(self.to_arrow_array().as_ref(), mask) + let res = compute::filter(self.to_arrow_array().as_ref(), mask) .map_err(|err| vm.new_runtime_error(format!("Arrow Error: {err:#?}")))?; - let ret = Helper::try_into_vector(&*res).map_err(|e| { - vm.new_type_error(format!( - "Can't cast result into vector, result: {:?}, err: {:?}", - res, e - )) + let ret = Helper::try_into_vector(res.clone()).map_err(|e| { + vm.new_type_error(format!("Can't cast result into vector, err: {:?}", e)) })?; Ok(Self::from(ret).into_pyobject(vm)) } else { @@ -654,9 +609,9 @@ impl PyVector { let (mut range, step, slice_len) = slice.adjust_indices(self.len()); let vector = self.as_vector_ref(); - let mut buf = VectorBuilder::with_capacity(vector.data_type(), slice_len); + let mut buf = vector.data_type().create_mutable_vector(slice_len); if slice_len == 0 { - let v: PyVector = buf.finish().into(); + let v: PyVector = buf.to_vector().into(); Ok(v.into_pyobject(vm)) } else if step == 1 { let v: PyVector = vector.slice(range.next().unwrap_or(0), slice_len).into(); @@ -664,15 +619,17 @@ impl PyVector { } else if step.is_negative() { // Negative step require special treatment for i in range.rev().step_by(step.unsigned_abs()) { - buf.push(&vector.get(i)) + // Safety: This mutable vector is created from the vector's data type. + buf.push_value_ref(vector.get_ref(i)).unwrap(); } - let v: PyVector = buf.finish().into(); + let v: PyVector = buf.to_vector().into(); Ok(v.into_pyobject(vm)) } else { for i in range.step_by(step.unsigned_abs()) { - buf.push(&vector.get(i)) + // Safety: This mutable vector is created from the vector's data type. + buf.push_value_ref(vector.get_ref(i)).unwrap(); } - let v: PyVector = buf.finish().into(); + let v: PyVector = buf.to_vector().into(); Ok(v.into_pyobject(vm)) } } @@ -693,19 +650,19 @@ impl PyVector { /// get corresponding arrow op function according to given PyComaprsionOp /// /// TODO(discord9): impl scalar version function -fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> Box { +fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> ArrowResult { let op_bool_arr = match op { - PyComparisonOp::Eq => comparison::eq, - PyComparisonOp::Ne => comparison::neq, - PyComparisonOp::Gt => comparison::gt, - PyComparisonOp::Lt => comparison::lt, - PyComparisonOp::Ge => comparison::gt_eq, - PyComparisonOp::Le => comparison::lt_eq, + PyComparisonOp::Eq => comparison::eq_dyn, + PyComparisonOp::Ne => comparison::neq_dyn, + PyComparisonOp::Gt => comparison::gt_dyn, + PyComparisonOp::Lt => comparison::lt_dyn, + PyComparisonOp::Ge => comparison::gt_eq_dyn, + PyComparisonOp::Le => comparison::lt_eq_dyn, }; - move |a: &dyn Array, b: &dyn Array| -> Box { - let ret = op_bool_arr(a, b); - Box::new(ret) as _ + move |a: &dyn Array, b: &dyn Array| -> ArrowResult { + let array = op_bool_arr(a, b)?; + Ok(Arc::new(array)) } } @@ -714,19 +671,20 @@ fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> Box impl Fn(&dyn Array, &dyn Scalar, &VirtualMachine) -> PyResult> { +) -> impl Fn(&dyn Array, &dyn Array, &VirtualMachine) -> PyResult { let op_bool_arr = match op { - PyComparisonOp::Eq => comparison::eq_scalar, - PyComparisonOp::Ne => comparison::neq_scalar, - PyComparisonOp::Gt => comparison::gt_scalar, - PyComparisonOp::Lt => comparison::lt_scalar, - PyComparisonOp::Ge => comparison::gt_eq_scalar, - PyComparisonOp::Le => comparison::lt_eq_scalar, + PyComparisonOp::Eq => comparison::eq_dyn, + PyComparisonOp::Ne => comparison::neq_dyn, + PyComparisonOp::Gt => comparison::gt_dyn, + PyComparisonOp::Lt => comparison::lt_dyn, + PyComparisonOp::Ge => comparison::gt_eq_dyn, + PyComparisonOp::Le => comparison::lt_eq_dyn, }; - move |a: &dyn Array, b: &dyn Scalar, _vm| -> PyResult> { - let ret = op_bool_arr(a, b); - Ok(Box::new(ret) as _) + move |a: &dyn Array, b: &dyn Array, vm| -> PyResult { + let array = + op_bool_arr(a, b).map_err(|e| vm.new_type_error(format!("scalar op error: {}", e)))?; + Ok(Arc::new(array)) } } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index e4e40ec916..7c1570d8d1 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -21,12 +21,10 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_ use common_query::Output; use common_recordbatch::util as record_util; use common_telemetry::logging; -use common_time::timestamp::Timestamp; use common_time::util; -use datatypes::arrow::array::Utf8Array; use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; -use datatypes::vectors::{StringVector, TimestampVector, VectorRef}; +use datatypes::vectors::{StringVector, TimestampMillisecondVector, Vector, VectorRef}; use query::QueryEngineRef; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; @@ -104,21 +102,16 @@ impl ScriptsTable { // Timestamp in key part is intentionally left to 0 columns_values.insert( "timestamp".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( - 0, - )])) as _, + Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _, ); + let now = util::current_time_millis(); columns_values.insert( "gmt_created".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( - util::current_time_millis(), - )])) as _, + Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _, ); columns_values.insert( "gmt_modified".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( - util::current_time_millis(), - )])) as _, + Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _, ); let table = self @@ -173,23 +166,21 @@ impl ScriptsTable { ensure!(!records.is_empty(), ScriptNotFoundSnafu { name }); assert_eq!(records.len(), 1); - assert_eq!(records[0].df_recordbatch.num_columns(), 1); + assert_eq!(records[0].num_columns(), 1); - let record = &records[0].df_recordbatch; - - let script_column = record - .column(0) + let script_column = records[0].column(0); + let script_column = script_column .as_any() - .downcast_ref::>() - .context(CastTypeSnafu { + .downcast_ref::() + .with_context(|| CastTypeSnafu { msg: format!( - "can't downcast {:?} array into utf8 array", - record.column(0).data_type() + "can't downcast {:?} array into string vector", + script_column.data_type() ), })?; assert_eq!(script_column.len(), 1); - Ok(script_column.value(0).to_string()) + Ok(script_column.get_data(0).unwrap().to_string()) } #[inline]