diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 8634647c7a..b6f6519929 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -379,7 +379,8 @@ import greptime as gt @copr(args=["number"], returns = ["number"], sql = "select * from numbers") def test(number) -> vector[u32]: - return query.sql("select * from numbers")[0][0] + from greptime import query + return query().sql("select * from numbers")[0][0] "#; let script = script_engine .compile(script, CompileContext::default()) @@ -438,7 +439,8 @@ from greptime import col @copr(args=["number"], returns = ["number"], sql = "select * from numbers") def test(number) -> vector[u32]: - return dataframe.filter(col("number")==col("number")).collect()[0][0] + from greptime import dataframe + return dataframe().filter(col("number")==col("number")).collect()[0][0] "#; let script = script_engine .compile(script, CompileContext::default()) diff --git a/src/script/src/python/ffi_types/copr.rs b/src/script/src/python/ffi_types/copr.rs index 55824cbb78..c18303027b 100644 --- a/src/script/src/python/ffi_types/copr.rs +++ b/src/script/src/python/ffi_types/copr.rs @@ -140,7 +140,7 @@ impl Coprocessor { cols.len() == names.len() && names.len() == anno.len(), OtherSnafu { reason: format!( - "Unmatched length for cols({}), names({}) and anno({})", + "Unmatched length for cols({}), names({}) and annotation({})", cols.len(), names.len(), anno.len() @@ -335,7 +335,7 @@ pub fn exec_coprocessor(script: &str, rb: &Option) -> Result Vec { vec![ CoprTestCase { script: r#" +from greptime import vector +@copr(returns=["value"]) +def boolean_array() -> vector[f64]: + v = vector([1.0, 2.0, 3.0]) + # This returns a vector([2.0]) + return v[(v > 1) & (v < 3)] +"# + .to_string(), + expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))), + }, + #[cfg(feature = "pyo3_backend")] + CoprTestCase { + script: r#" +@copr(returns=["value"], backend="pyo3") +def boolean_array() -> vector[f64]: + from greptime import vector + from greptime import query, dataframe + + try: + print("query()=", query()) + except KeyError as e: + print("query()=", e) + try: + print("dataframe()=", dataframe()) + except KeyError as e: + print("dataframe()=", e) + + v = vector([1.0, 2.0, 3.0]) + # This returns a vector([2.0]) + return v[(v > 1) & (v < 3)] +"# + .to_string(), + expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))), + }, + #[cfg(feature = "pyo3_backend")] + CoprTestCase { + script: r#" +@copr(returns=["value"], backend="pyo3") +def boolean_array() -> vector[f64]: + from greptime import vector + v = vector([1.0, 2.0, 3.0]) + # This returns a vector([2.0]) + return v[(v > 1) & (v < 3)] +"# + .to_string(), + expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))), + }, + CoprTestCase { + script: r#" @copr(args=["number", "number"], returns=["value"], sql="select number from numbers limit 5", backend="rspy") @@ -107,9 +156,9 @@ def answer() -> vector[i64]: script: r#" @copr(args=[], returns = ["number"], sql = "select * from numbers", backend="rspy") def answer() -> vector[i64]: - from greptime import vector, col, lit + from greptime import vector, col, lit, dataframe expr_0 = (col("number")0) - ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0] + ret = dataframe().select([col("number")]).filter(expr_0).collect()[0][0] return ret "# .to_string(), @@ -120,10 +169,10 @@ def answer() -> vector[i64]: script: r#" @copr(args=[], returns = ["number"], sql = "select * from numbers", backend="pyo3") def answer() -> vector[i64]: - from greptime import vector, col, lit + from greptime import vector, col, lit, dataframe # Bitwise Operator pred comparison operator expr_0 = (col("number")0) - ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0] + ret = dataframe().select([col("number")]).filter(expr_0).collect()[0][0] return ret "# .to_string(), diff --git a/src/script/src/python/ffi_types/vector/tests.rs b/src/script/src/python/ffi_types/vector/tests.rs index 0a08c6e421..8ee4f6f7cc 100644 --- a/src/script/src/python/ffi_types/vector/tests.rs +++ b/src/script/src/python/ffi_types/vector/tests.rs @@ -133,7 +133,7 @@ fn get_test_cases() -> Vec { } #[cfg(feature = "pyo3_backend")] fn eval_pyo3(testcase: TestCase, locals: HashMap) { - init_cpython_interpreter(); + init_cpython_interpreter().unwrap(); Python::with_gil(|py| { let locals = { let locals_dict = PyDict::new(py); diff --git a/src/script/src/python/pyo3/builtins.rs b/src/script/src/python/pyo3/builtins.rs index e0921ba84e..809fbfe43c 100644 --- a/src/script/src/python/pyo3/builtins.rs +++ b/src/script/src/python/pyo3/builtins.rs @@ -20,10 +20,13 @@ use datafusion::physical_plan::expressions; use datafusion_expr::ColumnarValue; use datafusion_physical_expr::{math_expressions, AggregateExpr}; use datatypes::vectors::VectorRef; -use pyo3::exceptions::PyValueError; +use pyo3::exceptions::{PyKeyError, PyValueError}; use pyo3::prelude::*; +use pyo3::types::PyDict; +use super::dataframe_impl::PyDataFrame; use super::utils::scalar_value_to_py_any; +use crate::python::ffi_types::copr::PyQueryEngine; use crate::python::ffi_types::utils::all_to_f64; use crate::python::ffi_types::PyVector; use crate::python::pyo3::dataframe_impl::{col, lit}; @@ -58,9 +61,12 @@ macro_rules! batch_import { #[pyo3(name = "greptime")] pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; + use self::query_engine; batch_import!( m, [ + dataframe, + query_engine, lit, col, pow, @@ -112,6 +118,34 @@ pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> { Ok(()) } +fn get_globals(py: Python) -> PyResult<&PyDict> { + // TODO(discord9): check if this is sound(in python) + let py_main = PyModule::import(py, "__main__")?; + let globals = py_main.dict(); + Ok(globals) +} + +#[pyfunction] +fn dataframe(py: Python) -> PyResult { + let globals = get_globals(py)?; + let df = globals + .get_item("__dataframe__") + .ok_or(PyKeyError::new_err("No __dataframe__ variable is found"))? + .extract::()?; + Ok(df) +} + +#[pyfunction] +#[pyo3(name = "query")] +fn query_engine(py: Python) -> PyResult { + let globals = get_globals(py)?; + let query = globals + .get_item("__query__") + .ok_or(PyKeyError::new_err("No __query__ variable is found"))? + .extract::()?; + Ok(query) +} + fn eval_func(py: Python<'_>, name: &str, v: &[&PyObject]) -> PyResult { let v = to_array_of_py_vec(py, v)?; py.allow_threads(|| { diff --git a/src/script/src/python/pyo3/copr_impl.rs b/src/script/src/python/pyo3/copr_impl.rs index 456c8f4eaf..0ed7ccb6e2 100644 --- a/src/script/src/python/pyo3/copr_impl.rs +++ b/src/script/src/python/pyo3/copr_impl.rs @@ -24,7 +24,6 @@ use snafu::{ensure, Backtrace, GenerateImplicitData, ResultExt}; use crate::python::error::{self, NewRecordBatchSnafu, OtherSnafu, Result}; use crate::python::ffi_types::copr::PyQueryEngine; use crate::python::ffi_types::{check_args_anno_real_type, select_from_rb, Coprocessor, PyVector}; -use crate::python::pyo3::builtins::greptime_builtins; use crate::python::pyo3::dataframe_impl::PyDataFrame; use crate::python::pyo3::utils::{init_cpython_interpreter, pyo3_obj_try_to_typed_val}; @@ -57,6 +56,7 @@ impl PyQueryEngine { } // TODO: put this into greptime module } + /// Execute a `Coprocessor` with given `RecordBatch` pub(crate) fn pyo3_exec_parsed( copr: &Coprocessor, @@ -73,7 +73,7 @@ pub(crate) fn pyo3_exec_parsed( Vec::new() }; // Just in case cpython is not inited - init_cpython_interpreter(); + init_cpython_interpreter().unwrap(); Python::with_gil(|py| -> Result<_> { let mut cols = (|| -> PyResult<_> { let dummy_decorator = " @@ -86,7 +86,6 @@ def copr(*dummy, **kwdummy): return func return inner coprocessor = copr -from greptime import vector "; let gen_call = format!("\n_return_from_coprocessor = {}(*_args_for_coprocessor, **_kwargs_for_coprocessor)", copr.name); let script = format!("{}{}{}", dummy_decorator, copr.script, gen_call); @@ -109,14 +108,11 @@ from greptime import vector let globals = py_main.dict(); let locals = PyDict::new(py); - let greptime = PyModule::new(py, "greptime")?; - greptime_builtins(py, greptime)?; - locals.set_item("greptime", greptime)?; if let Some(engine) = &copr.query_engine { let query_engine = PyQueryEngine::from_weakref(engine.clone()); let query_engine = PyCell::new(py, query_engine)?; - globals.set_item("query", query_engine)?; + globals.set_item("__query__", query_engine)?; } // TODO(discord9): find out why `dataframe` is not in scope @@ -129,12 +125,12 @@ from greptime import vector ) )?; let dataframe = PyCell::new(py, dataframe)?; - globals.set_item("dataframe", dataframe)?; + globals.set_item("__dataframe__", dataframe)?; } - locals.set_item("_args_for_coprocessor", args)?; locals.set_item("_kwargs_for_coprocessor", kwargs)?; + // `greptime` is already import when init interpreter, so no need to set in here // TODO(discord9): find a better way to set `dataframe` and `query` in scope/ or set it into module(latter might be impossible and not idomatic even in python) // set `dataframe` and `query` in scope/ or set it into module @@ -219,10 +215,10 @@ mod copr_test { @copr(args=["cpu", "mem"], returns=["ref"], backend="pyo3") def a(cpu, mem, **kwargs): import greptime as gt - from greptime import vector, log2, sum, pow, col, lit + from greptime import vector, log2, sum, pow, col, lit, dataframe for k, v in kwargs.items(): print("%s == %s" % (k, v)) - print(dataframe.select([col("cpu")= 0.75) "#; let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]); diff --git a/src/script/src/python/pyo3/dataframe_impl.rs b/src/script/src/python/pyo3/dataframe_impl.rs index d07faf06db..07bee23df8 100644 --- a/src/script/src/python/pyo3/dataframe_impl.rs +++ b/src/script/src/python/pyo3/dataframe_impl.rs @@ -26,6 +26,8 @@ use crate::python::ffi_types::PyVector; use crate::python::pyo3::utils::pyo3_obj_try_to_typed_scalar_value; use crate::python::utils::block_on_async; type PyExprRef = Py; + +#[derive(Debug, Clone)] #[pyclass] pub(crate) struct PyDataFrame { inner: DfDataFrame, @@ -47,6 +49,9 @@ impl PyDataFrame { #[pymethods] impl PyDataFrame { + fn __call__(&self) -> PyResult { + Ok(self.clone()) + } fn select_columns(&self, columns: Vec) -> PyResult { Ok(self .inner @@ -252,6 +257,9 @@ pub(crate) fn col(name: String) -> PyExpr { #[pymethods] impl PyExpr { + fn __call__(&self) -> PyResult { + Ok(self.clone()) + } fn __richcmp__(&self, py: Python<'_>, other: PyObject, op: CompareOp) -> PyResult { let other = other.extract::(py).or_else(|_| lit(py, other))?; let op = match op { diff --git a/src/script/src/python/pyo3/utils.rs b/src/script/src/python/pyo3/utils.rs index 7b700bd066..cdb272193e 100644 --- a/src/script/src/python/pyo3/utils.rs +++ b/src/script/src/python/pyo3/utils.rs @@ -36,7 +36,9 @@ static START_PYO3: Lazy> = Lazy::new(|| Mutex::new(false)); pub(crate) fn to_py_err(err: impl ToString) -> PyErr { PyArrowException::new_err(err.to_string()) } -pub(crate) fn init_cpython_interpreter() { + +/// init cpython interpreter with `greptime` builtins, if already inited, do nothing +pub(crate) fn init_cpython_interpreter() -> PyResult<()> { let mut start = START_PYO3.lock().unwrap(); if !*start { pyo3::append_to_inittab!(greptime_builtins); @@ -44,6 +46,7 @@ pub(crate) fn init_cpython_interpreter() { *start = true; info!("Started CPython Interpreter"); } + Ok(()) } pub fn val_to_py_any(py: Python<'_>, val: Value) -> PyResult { diff --git a/src/script/src/python/pyo3/vector_impl.rs b/src/script/src/python/pyo3/vector_impl.rs index cd9c69010a..7c6a778fd1 100644 --- a/src/script/src/python/pyo3/vector_impl.rs +++ b/src/script/src/python/pyo3/vector_impl.rs @@ -21,11 +21,12 @@ use datatypes::arrow::array::{Array, ArrayRef}; use datatypes::arrow::datatypes::DataType as ArrowDataType; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::vectors::Helper; -use pyo3::exceptions::PyValueError; +use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::pyclass::CompareOp; use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyString, PyType}; +use super::utils::val_to_py_any; use crate::python::ffi_types::vector::{arrow_rtruediv, wrap_bool_result, wrap_result, PyVector}; use crate::python::pyo3::utils::{pyo3_obj_try_to_typed_val, to_py_err}; @@ -278,6 +279,45 @@ impl PyVector { let v = Helper::try_into_vector(array).map_err(to_py_err)?; Ok(v.into()) } + + /// PyO3's Magic Method for slicing and indexing + fn __getitem__(&self, py: Python, needle: PyObject) -> PyResult { + if let Ok(needle) = needle.extract::(py) { + let mask = needle.to_arrow_array(); + let mask = mask + .as_any() + .downcast_ref::() + .ok_or_else(|| { + PyValueError::new_err( + "A Boolean Array is requested for slicing, found {mask:?}", + ) + })?; + let result = compute::filter(&self.to_arrow_array(), mask) + .map_err(|err| PyRuntimeError::new_err(format!("Arrow Error: {err:#?}")))?; + let ret = Helper::try_into_vector(result.clone()).map_err(|e| { + PyRuntimeError::new_err(format!("Can't cast result into vector, err: {e:?}")) + })?; + let ret = Self::from(ret).into_py(py); + Ok(ret) + } else if let Ok(index) = needle.extract::(py) { + // deal with negative index + let len = self.len() as isize; + let index = if index < 0 { len + index } else { index }; + if index < 0 || index >= len { + return Err(PyIndexError::new_err(format!( + "Index out of bound, index: {index}, len: {len}", + index = index, + len = len + ))); + } + let val = self.as_vector_ref().get(index as usize); + val_to_py_any(py, val) + } else { + Err(PyValueError::new_err( + "{needle:?} is neither a Vector nor a int, can't use for slicing or indexing", + )) + } + } } #[cfg(test)] @@ -317,7 +357,7 @@ mod test { } #[test] fn test_py_vector_api() { - init_cpython_interpreter(); + init_cpython_interpreter().unwrap(); Python::with_gil(|py| { let module = PyModule::new(py, "gt").unwrap(); module.add_class::().unwrap(); diff --git a/src/script/src/python/rspython/builtins.rs b/src/script/src/python/rspython/builtins.rs index bc09b9f2d1..def55fd5df 100644 --- a/src/script/src/python/rspython/builtins.rs +++ b/src/script/src/python/rspython/builtins.rs @@ -306,13 +306,38 @@ pub(crate) mod greptime_builtin { all_to_f64, eval_aggr_fn, from_df_err, try_into_columnar_value, try_into_py_obj, type_cast_error, }; + use crate::python::ffi_types::copr::PyQueryEngine; use crate::python::ffi_types::vector::val_to_pyobj; use crate::python::ffi_types::PyVector; - use crate::python::rspython::dataframe_impl::data_frame::{PyExpr, PyExprRef}; + use crate::python::rspython::dataframe_impl::data_frame::{PyDataFrame, PyExpr, PyExprRef}; use crate::python::rspython::utils::{ is_instance, py_obj_to_value, py_obj_to_vec, PyVectorRef, }; + /// get `__dataframe__` from globals and return it + /// TODO(discord9): this is a terrible hack, we should find a better way to get `__dataframe__` + #[pyfunction] + fn dataframe(vm: &VirtualMachine) -> PyResult { + let df = vm.current_globals().get_item("__dataframe__", vm)?; + let df = df + .payload::() + .ok_or_else(|| vm.new_type_error(format!("object {:?} is not a DataFrame", df)))?; + let df = df.clone(); + Ok(df) + } + + /// get `__query__` from globals and return it + /// TODO(discord9): this is a terrible hack, we should find a better way to get `__dataframe__` + #[pyfunction] + fn query(vm: &VirtualMachine) -> PyResult { + let query_engine = vm.current_globals().get_item("__query__", vm)?; + let query_engine = query_engine.payload::().ok_or_else(|| { + vm.new_type_error(format!("object {:?} is not a QueryEngine", query_engine)) + })?; + let query_engine = query_engine.clone(); + Ok(query_engine) + } + #[pyfunction] fn vector(args: OptionalArg, vm: &VirtualMachine) -> PyResult { PyVector::new(args, vm) diff --git a/src/script/src/python/rspython/copr_impl.rs b/src/script/src/python/rspython/copr_impl.rs index 8c02c8e027..fdfbc8fcca 100644 --- a/src/script/src/python/rspython/copr_impl.rs +++ b/src/script/src/python/rspython/copr_impl.rs @@ -81,12 +81,13 @@ fn set_items_in_scope( fn set_query_engine_in_scope( scope: &Scope, vm: &VirtualMachine, + name: &str, query_engine: PyQueryEngine, ) -> Result<()> { scope .locals .as_object() - .set_item("query", query_engine.to_pyobject(vm), vm) + .set_item(name, query_engine.to_pyobject(vm), vm) .map_err(|e| format_py_error(e, vm)) } @@ -101,7 +102,7 @@ pub(crate) fn exec_with_cached_vm( // set arguments with given name and values let scope = vm.new_scope_with_builtins(); if let Some(rb) = rb { - set_dataframe_in_scope(&scope, vm, "dataframe", rb)?; + set_dataframe_in_scope(&scope, vm, "__dataframe__", rb)?; } if let Some(arg_names) = &copr.deco_args.arg_names { @@ -113,7 +114,7 @@ pub(crate) fn exec_with_cached_vm( let query_engine = PyQueryEngine::from_weakref(engine.clone()); // put a object named with query of class PyQueryEngine in scope - set_query_engine_in_scope(&scope, vm, query_engine)?; + set_query_engine_in_scope(&scope, vm, "__query__", query_engine)?; } if let Some(kwarg) = &copr.kwarg { diff --git a/src/script/src/python/rspython/dataframe_impl.rs b/src/script/src/python/rspython/dataframe_impl.rs index 3564c2cd2f..0884a9ad16 100644 --- a/src/script/src/python/rspython/dataframe_impl.rs +++ b/src/script/src/python/rspython/dataframe_impl.rs @@ -38,7 +38,7 @@ pub(crate) mod data_frame { use crate::python::rspython::builtins::greptime_builtin::lit; use crate::python::utils::block_on_async; #[rspyclass(module = "data_frame", name = "DataFrame")] - #[derive(PyPayload, Debug)] + #[derive(PyPayload, Debug, Clone)] pub struct PyDataFrame { pub inner: DfDataFrame, } diff --git a/src/script/src/python/rspython/test.rs b/src/script/src/python/rspython/test.rs index cc819cbd57..381c98b482 100644 --- a/src/script/src/python/rspython/test.rs +++ b/src/script/src/python/rspython/test.rs @@ -287,7 +287,7 @@ def a(cpu, mem): abc = vector([v[0] > v[1] for v in zip(cpu, mem)]) fed = cpu.filter(abc) ref = log2(fed/prev(fed)) - return (0.5 < cpu) & ~( cpu >= 0.75) + return cpu[(cpu > 0.5) & ~( cpu >= 0.75)] "#; let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]); let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]); diff --git a/src/script/src/python/rspython/testcases.ron b/src/script/src/python/rspython/testcases.ron index 74a8c42d73..56995e196a 100644 --- a/src/script/src/python/rspython/testcases.ron +++ b/src/script/src/python/rspython/testcases.ron @@ -559,11 +559,11 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], // constant column(int) name: "test_data_frame", code: r#" -from greptime import col +from greptime import col, dataframe @copr(args=["cpu", "mem"], returns=["perf", "what"]) def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): - ret = dataframe.select([col("cpu"), col("mem")]).collect()[0] + ret = dataframe().select([col("cpu"), col("mem")]).collect()[0] return ret[0], ret[1] "#, predicate: ExecIsOk( @@ -593,11 +593,11 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], // constant column(int) name: "test_data_frame", code: r#" -from greptime import col +from greptime import col, dataframe @copr(args=["cpu", "mem"], returns=["perf", "what"]) def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): - ret = dataframe.filter(col("cpu")>col("mem")).collect()[0] + ret = dataframe().filter(col("cpu")>col("mem")).collect()[0] return ret[0], ret[1] "#, predicate: ExecIsOk(