From dfa30123967e130e7cda390089d48e6b2c63af3e Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Thu, 15 Sep 2022 16:18:33 +0800 Subject: [PATCH] feat: improve python coprocesssor parser (#260) * feat: supports DateTime, Date and Timestamp column type to be returned by py scripts * feat: improve coprocessor compiler, make it work better * fix: comments * fix: typo * Update src/script/src/python/vector.rs Co-authored-by: LFC * Update src/script/src/python/coprocessor.rs Co-authored-by: LFC Co-authored-by: LFC --- src/script/src/python/coprocessor.rs | 206 +++++-------------- src/script/src/python/coprocessor/compile.rs | 121 +++++++++++ src/script/src/python/coprocessor/parse.rs | 192 +++++++++-------- src/script/src/python/engine.rs | 14 +- src/script/src/python/test.rs | 12 +- src/script/src/python/testcases.ron | 52 +++-- src/script/src/python/vector.rs | 33 ++- 7 files changed, 354 insertions(+), 276 deletions(-) create mode 100644 src/script/src/python/coprocessor/compile.rs diff --git a/src/script/src/python/coprocessor.rs b/src/script/src/python/coprocessor.rs index 9e15f1242f..6b1bb7d145 100644 --- a/src/script/src/python/coprocessor.rs +++ b/src/script/src/python/coprocessor.rs @@ -1,3 +1,4 @@ +pub mod compile; pub mod parse; use std::collections::HashMap; @@ -14,12 +15,6 @@ use datatypes::schema::Schema; use datatypes::vectors::Helper; use datatypes::vectors::{BooleanVector, Vector, VectorRef}; use rustpython_bytecode::CodeObject; -use rustpython_compiler_core::compile; -use rustpython_parser::{ - ast, - ast::{Located, Location}, - parser, -}; use rustpython_vm as vm; use rustpython_vm::{class::PyClassImpl, AsObject}; #[cfg(test)] @@ -29,12 +24,10 @@ use vm::builtins::{PyBaseExceptionRef, PyTuple}; use vm::scope::Scope; use vm::{Interpreter, PyObjectRef, VirtualMachine}; -use crate::fail_parse_error; use crate::python::builtins::greptime_builtin; -use crate::python::coprocessor::parse::{ret_parse_error, DecoratorArgs}; +use crate::python::coprocessor::parse::DecoratorArgs; use crate::python::error::{ - ensure, ret_other_error_with, ArrowSnafu, CoprParseSnafu, OtherSnafu, PyCompileSnafu, - PyParseSnafu, Result, TypeCastSnafu, + ensure, ret_other_error_with, ArrowSnafu, OtherSnafu, Result, TypeCastSnafu, }; use crate::python::utils::{format_py_error, py_vec_obj_to_array}; use crate::python::{utils::is_instance, PyVector}; @@ -50,7 +43,7 @@ pub struct AnnotationInfo { pub type CoprocessorRef = Arc; #[cfg_attr(test, derive(Deserialize))] -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone)] pub struct Coprocessor { pub name: String, pub deco_args: DecoratorArgs, @@ -61,141 +54,26 @@ pub struct Coprocessor { /// store its corresponding script, also skip serde when in `cfg(test)` to reduce work in compare #[cfg_attr(test, serde(skip))] pub script: String, + // We must use option here, because we use `serde` to deserialize coprocessor + // from ron file and `Deserialize` requires Coprocessor implementing `Default` trait, + // but CodeObject doesn't. + #[cfg_attr(test, serde(skip))] + pub code_obj: Option, } +impl PartialEq for Coprocessor { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.deco_args == other.deco_args + && self.arg_types == other.arg_types + && self.return_types == other.return_types + && self.script == other.script + } +} + +impl Eq for Coprocessor {} + impl Coprocessor { - /// generate a call to the coprocessor function - /// with arguments given in decorator's `args` list - /// also set in location in source code to `loc` - fn gen_call(&self, loc: &Location) -> ast::Stmt<()> { - let mut loc = loc.to_owned(); - // adding a line to avoid confusing if any error occurs when calling the function - // then the pretty print will point to the last line in code - // instead of point to any of existing code written by user. - loc.newline(); - let args: Vec> = self - .deco_args - .arg_names - .iter() - .map(|v| { - let node = ast::ExprKind::Name { - id: v.to_owned(), - ctx: ast::ExprContext::Load, - }; - create_located(node, loc) - }) - .collect(); - let func = ast::ExprKind::Call { - func: Box::new(create_located( - ast::ExprKind::Name { - id: self.name.to_owned(), - ctx: ast::ExprContext::Load, - }, - loc, - )), - args, - keywords: Vec::new(), - }; - let stmt = ast::StmtKind::Expr { - value: Box::new(create_located(func, loc)), - }; - create_located(stmt, loc) - } - - /// check if `Mod` is of one line of statement - fn check_before_compile(top: &ast::Mod) -> Result<()> { - if let ast::Mod::Interactive { body: code } = top { - ensure!( - code.len() == 1, - CoprParseSnafu { - reason: format!( - "Expect only one statement in script, found {} statement", - code.len() - ), - loc: code.first().map(|s| s.location) - } - ); - - if let ast::StmtKind::FunctionDef { - name: _, - args: _, - body: _, - decorator_list: _, - returns: _, - type_comment: __main__, - } = &code[0].node - { - } else { - return fail_parse_error!( - format!("Expect the one and only statement in script as a function def, but instead found: {:?}", code[0].node), - Some(code[0].location) - ); - } - } else { - return fail_parse_error!( - format!("Expect statement in script, found: {:?}", top), - None, - ); - } - Ok(()) - } - - /// stripe the decorator(`@xxxx`) and type annotation(for type checker is done in rust function), add one line in the ast for call function with given parameter, and compiler into `CodeObject` - /// - /// The rationale is that rustpython's vm is not very efficient according to [offical benchmark](https://rustpython.github.io/benchmarks), - /// So we should avoid running too much Python Bytecode, hence in this function we delete `@` decorator(instead of actually write a decorator in python) - /// And add a function call in the end and also - /// strip type annotation - fn strip_append_and_compile(&self) -> Result { - let script = &self.script; - // note that it's important to use `parser::Mode::Interactive` so the ast can be compile to return a result instead of return None in eval mode - let mut top = parser::parse(script, parser::Mode::Interactive).context(PyParseSnafu)?; - Self::check_before_compile(&top)?; - // erase decorator - if let ast::Mod::Interactive { body } = &mut top { - let code = body; - if let ast::StmtKind::FunctionDef { - name: _, - args, - body: _, - decorator_list, - returns, - type_comment: __main__, - } = &mut code[0].node - { - *decorator_list = Vec::new(); - // strip type annotation - // def a(b: int, c:int) -> int - // will became - // def a(b, c) - *returns = None; - for arg in &mut args.args { - arg.node.annotation = None; - } - } else { - // already done in check function - unreachable!() - } - let loc = code[0].location; - - // This manually construct ast has no corrsponding code - // in the script, so just give it a location that don't exist in orginal script - // (which doesn't matter because Location usually only used in pretty print errors) - code.push(self.gen_call(&loc)); - } else { - // already done in check function - unreachable!() - } - // use `compile::Mode::BlockExpr` so it return the result of statement - compile::compile_top( - &top, - "".to_owned(), - compile::Mode::BlockExpr, - compile::CompileOpts { optimize: 0 }, - ) - .context(PyCompileSnafu) - } - /// generate [`Schema`] according to return names, types, /// if no annotation /// the datatypes of the actual columns is used directly @@ -286,10 +164,6 @@ impl Coprocessor { } } -fn create_located(node: T, loc: Location) -> Located { - Located::new(loc, node) -} - /// cast a `dyn Array` of type unsigned/int/float into a `dyn Vector` fn try_into_vector(arg: Arc) -> Result> { // wrap try_into_vector in here to convert `datatypes::error::Error` to `python::error::Error` @@ -483,7 +357,7 @@ pub fn exec_coprocessor(script: &str, rb: &DfRecordBatch) -> Result // 1. parse the script and check if it's only a function with `@coprocessor` decorator, and get `args` and `returns`, // 2. also check for exist of `args` in `rb`, if not found, return error // TODO(discord9): cache the result of parse_copr - let copr = parse::parse_copr(script)?; + let copr = parse::parse_and_compile_copr(script)?; exec_parsed(&copr, rb) } @@ -499,8 +373,8 @@ pub(crate) fn exec_with_cached_vm( let scope = vm.new_scope_with_builtins(); set_items_in_scope(&scope, vm, &copr.deco_args.arg_names, args)?; - let code_obj = copr.strip_append_and_compile()?; - let code_obj = vm.ctx.new_code(code_obj); + // It's safe to unwrap code_object, it's already compiled before. + let code_obj = vm.ctx.new_code(copr.code_obj.clone().unwrap()); let ret = vm .run_code_obj(code_obj, scope) .map_err(|e| format_py_error(e, vm))?; @@ -567,3 +441,35 @@ pub fn exec_copr_print( crate::python::error::pretty_print_error_in_src(script, &e, ln_offset, filename) }) } + +#[cfg(test)] +mod tests { + use crate::python::coprocessor::parse::parse_and_compile_copr; + + #[test] + fn test_parse_copr() { + let script = r#" +def add(a, b): + return a + b + +@copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100") +def test(a, b, c): + import greptime as g + return add(a, b) / g.sqrt(c) +"#; + + let copr = parse_and_compile_copr(script).unwrap(); + assert_eq!(copr.name, "test"); + let deco_args = copr.deco_args.clone(); + assert_eq!( + deco_args.sql.unwrap(), + "select number as a,number as b,number as c from numbers limit 100" + ); + assert_eq!(deco_args.ret_names, vec!["r"]); + assert_eq!(deco_args.arg_names, vec!["a", "b", "c"]); + assert_eq!(copr.arg_types, vec![None, None, None]); + assert_eq!(copr.return_types, vec![None]); + assert_eq!(copr.script, script); + assert!(copr.code_obj.is_some()); + } +} diff --git a/src/script/src/python/coprocessor/compile.rs b/src/script/src/python/coprocessor/compile.rs new file mode 100644 index 0000000000..196684d711 --- /dev/null +++ b/src/script/src/python/coprocessor/compile.rs @@ -0,0 +1,121 @@ +//! compile script to code object + +use rustpython_bytecode::CodeObject; +use rustpython_compiler_core::compile as python_compile; +use rustpython_parser::{ + ast, + ast::{Located, Location}, + parser, +}; +use snafu::ResultExt; + +use crate::fail_parse_error; +use crate::python::coprocessor::parse::{ret_parse_error, DecoratorArgs}; +use crate::python::error::{PyCompileSnafu, PyParseSnafu, Result}; + +fn create_located(node: T, loc: Location) -> Located { + Located::new(loc, node) +} + +/// generate a call to the coprocessor function +/// with arguments given in decorator's `args` list +/// also set in location in source code to `loc` +fn gen_call(name: &str, deco_args: &DecoratorArgs, loc: &Location) -> ast::Stmt<()> { + let mut loc = loc.to_owned(); + // adding a line to avoid confusing if any error occurs when calling the function + // then the pretty print will point to the last line in code + // instead of point to any of existing code written by user. + loc.newline(); + let args: Vec> = deco_args + .arg_names + .iter() + .map(|v| { + let node = ast::ExprKind::Name { + id: v.to_owned(), + ctx: ast::ExprContext::Load, + }; + create_located(node, loc) + }) + .collect(); + let func = ast::ExprKind::Call { + func: Box::new(create_located( + ast::ExprKind::Name { + id: name.to_string(), + ctx: ast::ExprContext::Load, + }, + loc, + )), + args, + keywords: Vec::new(), + }; + let stmt = ast::StmtKind::Expr { + value: Box::new(create_located(func, loc)), + }; + create_located(stmt, loc) +} + +/// stripe the decorator(`@xxxx`) and type annotation(for type checker is done in rust function), add one line in the ast for call function with given parameter, and compiler into `CodeObject` +/// +/// The rationale is that rustpython's vm is not very efficient according to [offical benchmark](https://rustpython.github.io/benchmarks), +/// So we should avoid running too much Python Bytecode, hence in this function we delete `@` decorator(instead of actually write a decorator in python) +/// And add a function call in the end and also +/// strip type annotation +pub fn compile_script(name: &str, deco_args: &DecoratorArgs, script: &str) -> Result { + // note that it's important to use `parser::Mode::Interactive` so the ast can be compile to return a result instead of return None in eval mode + let mut top = parser::parse(script, parser::Mode::Interactive).context(PyParseSnafu)?; + // erase decorator + if let ast::Mod::Interactive { body } = &mut top { + let stmts = body; + let mut loc = None; + for stmt in stmts.iter_mut() { + if let ast::StmtKind::FunctionDef { + name: _, + args, + body: _, + decorator_list, + returns, + type_comment: __main__, + } = &mut stmt.node + { + *decorator_list = Vec::new(); + // strip type annotation + // def a(b: int, c:int) -> int + // will became + // def a(b, c) + *returns = None; + for arg in &mut args.args { + arg.node.annotation = None; + } + } else if matches!( + stmt.node, + ast::StmtKind::Import { .. } | ast::StmtKind::ImportFrom { .. } + ) { + // import statements are allowed. + } else { + // already checked in parser + unreachable!() + } + loc = Some(stmt.location); + + // This manually construct ast has no corrsponding code + // in the script, so just give it a location that don't exist in orginal script + // (which doesn't matter because Location usually only used in pretty print errors) + } + // Append statement which calling coprocessor function. + // It's safe to unwrap loc, it is always exists. + stmts.push(gen_call(name, deco_args, &loc.unwrap())); + } else { + return fail_parse_error!( + format!("Expect statement in script, found: {:?}", top), + None, + ); + } + // use `compile::Mode::BlockExpr` so it return the result of statement + python_compile::compile_top( + &top, + "".to_owned(), + python_compile::Mode::BlockExpr, + python_compile::CompileOpts { optimize: 0 }, + ) + .context(PyCompileSnafu) +} diff --git a/src/script/src/python/coprocessor/parse.rs b/src/script/src/python/coprocessor/parse.rs index 7dea6ae072..c3f4797218 100644 --- a/src/script/src/python/coprocessor/parse.rs +++ b/src/script/src/python/coprocessor/parse.rs @@ -8,8 +8,9 @@ use rustpython_parser::{ }; #[cfg(test)] use serde::Deserialize; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use crate::python::coprocessor::compile; use crate::python::coprocessor::AnnotationInfo; use crate::python::coprocessor::Coprocessor; use crate::python::error::{ensure, CoprParseSnafu, PyParseSnafu, Result}; @@ -415,107 +416,104 @@ fn get_return_annotations(rets: &ast::Expr<()>) -> Result>) -> Result<()> { - ensure!( - stmts.len() == 1, - CoprParseSnafu { - reason: - "Expect one and only one python function with `@coprocessor` or `@cpor` decorator" - .to_string(), - loc: stmts.first().map(|s| s.location) - } - ); - if let ast::StmtKind::FunctionDef { - name: _, - args: _, - body: _, - decorator_list, - returns: _, - type_comment: _, - } = &stmts[0].node - { - ensure!( - decorator_list.len() == 1, - CoprParseSnafu { - reason: "Expect one decorator", - loc: decorator_list.first().map(|s| s.location) - } - ); - } else { - return fail_parse_error!( - format!( - "Expect a function definition, found a \n{:#?}", - &stmts[0].node - ), - Some(stmts[0].location), - ); - } - Ok(()) -} - /// parse script and return `Coprocessor` struct with info extract from ast -pub fn parse_copr(script: &str) -> Result { +pub fn parse_and_compile_copr(script: &str) -> Result { let python_ast = parser::parse_program(script).context(PyParseSnafu)?; - check_copr(&python_ast)?; - if let ast::StmtKind::FunctionDef { - name, - args: fn_args, - body: _, - decorator_list, - returns, - type_comment: _, - } = &python_ast[0].node - { - let decorator = &decorator_list[0]; - let deco_args = parse_decorator(decorator)?; - // get arg types from type annotation - let arg_types = get_arg_annotations(fn_args)?; + let mut coprocessor = None; - // get return types from type annotation - let return_types = if let Some(rets) = returns { - get_return_annotations(rets)? + for stmt in python_ast { + if let ast::StmtKind::FunctionDef { + name, + args: fn_args, + body: _, + decorator_list, + returns, + type_comment: _, + } = &stmt.node + { + if !decorator_list.is_empty() { + ensure!(coprocessor.is_none(), + CoprParseSnafu { + reason: "Expect one and only one python function with `@coprocessor` or `@cpor` decorator", + loc: stmt.location, + } + ); + ensure!( + decorator_list.len() == 1, + CoprParseSnafu { + reason: "Expect one decorator", + loc: decorator_list.first().map(|s| s.location) + } + ); + + let decorator = &decorator_list[0]; + let deco_args = parse_decorator(decorator)?; + + // get arg types from type annotation + let arg_types = get_arg_annotations(fn_args)?; + + // get return types from type annotation + let return_types = if let Some(rets) = returns { + get_return_annotations(rets)? + } else { + // if no anntation at all, set it to all None + std::iter::repeat(None) + .take(deco_args.ret_names.len()) + .collect() + }; + + // make sure both arguments&returns in fucntion + // and in decorator have same length + ensure!( + deco_args.arg_names.len() == arg_types.len(), + CoprParseSnafu { + reason: format!( + "args number in decorator({}) and function({}) doesn't match", + deco_args.arg_names.len(), + arg_types.len() + ), + loc: None + } + ); + ensure!( + deco_args.ret_names.len() == return_types.len(), + CoprParseSnafu { + reason: format!( + "returns number in decorator( {} ) and function annotation( {} ) doesn't match", + deco_args.ret_names.len(), + return_types.len() + ), + loc: None + } + ); + coprocessor = Some(Coprocessor { + code_obj: Some(compile::compile_script(name, &deco_args, script)?), + name: name.to_string(), + deco_args, + arg_types, + return_types, + script: script.to_owned(), + }); + } + } else if matches!( + stmt.node, + ast::StmtKind::Import { .. } | ast::StmtKind::ImportFrom { .. } + ) { + // import statements are allowed. } else { - // if no anntation at all, set it to all None - std::iter::repeat(None) - .take(deco_args.ret_names.len()) - .collect() - }; - - // make sure both arguments&returns in fucntion - // and in decorator have same length - ensure!( - deco_args.arg_names.len() == arg_types.len(), - CoprParseSnafu { - reason: format!( - "args number in decorator({}) and function({}) doesn't match", - deco_args.arg_names.len(), - arg_types.len() + return fail_parse_error!( + format!( + "Expect a function definition, but found a \n{:#?}", + &stmt.node ), - loc: None - } - ); - ensure!( - deco_args.ret_names.len() == return_types.len(), - CoprParseSnafu { - reason: format!( - "returns number in decorator( {} ) and function annotation( {} ) doesn't match", - deco_args.ret_names.len(), - return_types.len() - ), - loc: None - } - ); - Ok(Coprocessor { - name: name.to_string(), - deco_args, - arg_types, - return_types, - script: script.to_owned(), - }) - } else { - unreachable!() + Some(stmt.location), + ); + } } + + coprocessor.context(CoprParseSnafu { + reason: "Coprocessor not found in script", + loc: None, + }) } diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 0b762fa1b4..7eea7602db 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -18,7 +18,7 @@ use snafu::{ensure, ResultExt}; use sql::statements::statement::Statement; use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; -use crate::python::coprocessor::{exec_parsed, parse::parse_copr}; +use crate::python::coprocessor::{exec_parsed, parse}; use crate::python::{ coprocessor::CoprocessorRef, error::{self, Result}, @@ -122,7 +122,7 @@ impl ScriptEngine for PyEngine { } async fn compile(&self, script: &str, _ctx: CompileContext) -> Result { - let copr = Arc::new(parse_copr(script)?); + let copr = Arc::new(parse::parse_and_compile_copr(script)?); Ok(PyScript { copr, @@ -165,10 +165,13 @@ mod tests { let script_engine = PyEngine::new(query_engine.clone()); let script = r#" +import greptime as g +def add(a, b): + return a + b; + @copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100") def test(a, b, c): - import greptime as g - return (a + b) / g.sqrt(c) + return add(a, b) / g.sqrt(c) "#; let script = script_engine .compile(script, CompileContext::default()) @@ -196,9 +199,10 @@ def test(a, b, c): // test list comprehension let script = r#" +import greptime as gt + @copr(args=["number"], returns = ["r"], sql="select number from numbers limit 100") def test(a): - import greptime as gt return gt.vector([x for x in a if x % 2 == 0]) "#; let script = script_engine diff --git a/src/script/src/python/test.rs b/src/script/src/python/test.rs index f34520c7b2..b307d85683 100644 --- a/src/script/src/python/test.rs +++ b/src/script/src/python/test.rs @@ -18,7 +18,7 @@ use super::error::{get_error_reason_loc, visualize_loc}; use crate::python::coprocessor::AnnotationInfo; use crate::python::error::pretty_print_error_in_src; use crate::python::{ - coprocessor, coprocessor::parse::parse_copr, coprocessor::Coprocessor, error::Error, + coprocessor, coprocessor::parse::parse_and_compile_copr, coprocessor::Coprocessor, error::Error, }; #[derive(Deserialize, Debug)] @@ -31,7 +31,7 @@ struct TestCase { #[derive(Deserialize, Debug)] enum Predicate { ParseIsOk { - result: Coprocessor, + result: Box, }, ParseIsErr { /// used to check if after serialize [`Error`] into a String, that string contains `reason` @@ -81,13 +81,13 @@ fn run_ron_testcases() { print!(".ron test {}", testcase.name); match testcase.predicate { Predicate::ParseIsOk { result } => { - let copr = parse_copr(&testcase.code); + let copr = parse_and_compile_copr(&testcase.code); let mut copr = copr.unwrap(); copr.script = "".into(); - assert_eq!(copr, result); + assert_eq!(copr, *result); } Predicate::ParseIsErr { reason } => { - let copr = parse_copr(&testcase.code); + let copr = parse_and_compile_copr(&testcase.code); if copr.is_ok() { eprintln!("Expect to be err, found{copr:#?}"); panic!() @@ -180,7 +180,7 @@ def a(cpu, mem: vector[f64])->(vector[f64|None], vector[f64], vector[_], vector[ return cpu + mem, cpu - mem, cpu * mem, cpu / mem "#; let pyast = parser::parse(python_source, parser::Mode::Interactive).unwrap(); - let copr = parse_copr(python_source); + let copr = parse_and_compile_copr(python_source); dbg!(copr); } diff --git a/src/script/src/python/testcases.ron b/src/script/src/python/testcases.ron index a6974d3c8e..44301446a1 100644 --- a/src/script/src/python/testcases.ron +++ b/src/script/src/python/testcases.ron @@ -1,13 +1,19 @@ -// This is the file for python coprocessor's testcases, +// This is the file for python coprocessor's testcases, // including coprocessor parsing test and execute test // check src/script/python/test.rs::run_ron_testcases() for more information [ ( name: "correct_parse", code: r#" +import greptime as gt +from greptime import pow +def add(a, b): + return a + b +def sub(a, b): + return a - b @copr(args=["cpu", "mem"], returns=["perf", "what", "how", "why"]) def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64|None], vector[_], vector[_ | None]): - return cpu + mem, cpu - mem, cpu * mem, cpu / mem + return add(cpu, mem), sub(cpu, mem), cpu * mem, cpu / mem "#, predicate: ParseIsOk( result: ( @@ -54,7 +60,21 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64|None], vecto return cpu + mem, cpu - mem, cpu * mem, cpu / mem "#, predicate: ParseIsErr( - reason: "Expect one decorator" + reason: "Coprocessor not found in script" + ) + ), + ( + name: "too_many_decorators", + code: r#" +@copr(args=["a"], returns=["r"]) +def test1(a): + return a; +@copr(args=["a"], returns=["r"]) +def test2(a): + return a; +"#, + predicate: ParseIsErr( + reason: "Expect one and only one python function with `@coprocessor` or `@cpor` decorator" ) ), ( @@ -173,8 +193,8 @@ def a(cpu: vector[f64], mem: vector[f64])->(vector[None|None], vector[into(f64)] return cpu + mem, cpu - mem, cpu * mem, cpu / mem "#, predicate: ParseIsErr( - reason: - "Expect one and only one python function with `@coprocessor` or `@cpor` decorator" + reason: + "Expect a function definition, but found a" ) ), ( @@ -186,7 +206,7 @@ def a(cpu: vector[f64], mem: vector[f64])->(vector[None|None], vector[into(f64)] return cpu + mem, cpu - mem, cpu * mem, cpu / mem "#, predicate: ParseIsErr( - reason: + reason: "Expect decorator with name `copr` or `coprocessor`, found" ) ), @@ -198,7 +218,7 @@ def a(cpu: vector[f64], mem: vector[f64])->(vector[f64|None], vector[into(f64)], return cpu + mem, cpu - mem, cpu * mem, cpu / mem "#, predicate: ParseIsErr( - reason: + reason: " keyword argument, found " ) ), @@ -210,7 +230,7 @@ def a(cpu: vector[f64], mem: vector[f64])->(vector[f64|None], vector[into(f64)], return cpu + mem, cpu - mem, cpu * mem, cpu / mem "#, predicate: ParseIsErr( - reason: + reason: " keyword argument, found " ) ), @@ -219,7 +239,7 @@ def a(cpu: vector[f64], mem: vector[f64])->(vector[f64|None], vector[into(f64)], name: "correct_exec", code: r#" @copr(args=["cpu", "mem"], returns=["perf", "what"]) -def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): return cpu + mem, cpu - mem "#, @@ -251,7 +271,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], name: "constant_float_col", code: r#" @copr(args=["cpu", "mem"], returns=["perf", "what"]) -def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): return cpu + mem, 1.0 "#, @@ -283,7 +303,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], name: "constant_int_col", code: r#" @copr(args=["cpu", "mem"], returns=["perf", "what"]) -def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): return cpu + mem, 1 "#, @@ -315,7 +335,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], name: "constant_bool_col", code: r#" @copr(args=["cpu", "mem"], returns=["perf", "what"]) -def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): return cpu + mem, True "#, @@ -358,7 +378,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f64], vecto name: "div_by_zero", code: r#" @copr(args=["cpu", "mem"], returns=["perf", "what"]) -def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): return cpu + mem, cpu - mem*(1/0) "#, @@ -370,7 +390,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], name: "unexpected_token", code: r#" @copr(args=["cpu", "mem"], returns=["perf", "what"]) -def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): return cpu + mem, cpu - mem*** "#, @@ -407,7 +427,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64]): 42 "#, predicate: ParseIsErr( - reason: "Expect a function definition, found a" + reason: "Expect a function definition, but found a" ) ) -] \ No newline at end of file +] diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index ffa430acc8..6ad17572c6 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -1,6 +1,9 @@ use std::ops::Deref; use std::sync::Arc; +use common_time::date::Date; +use common_time::datetime::DateTime; +use common_time::timestamp::Timestamp; use datatypes::arrow; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::compute; @@ -853,7 +856,32 @@ pub fn pyobj_try_to_typed_val( ConcreteDataType::List(_) => unreachable!(), ConcreteDataType::Date(_) | ConcreteDataType::DateTime(_) - | ConcreteDataType::Timestamp(_) => todo!(), + | ConcreteDataType::Timestamp(_) => { + if is_instance::(&obj, vm) { + match dtype { + ConcreteDataType::Date(_) => obj + .try_into_value::(vm) + .ok() + .map(Date::new) + .map(value::Value::Date), + ConcreteDataType::DateTime(_) => obj + .try_into_value::(vm) + .ok() + .map(DateTime::new) + .map(value::Value::DateTime), + ConcreteDataType::Timestamp(_) => { + // FIXME(dennis): we always consider the timestamp unit is millis, it's not correct if user define timestamp column with other units. + obj.try_into_value::(vm) + .ok() + .map(Timestamp::from_millis) + .map(value::Value::Timestamp) + } + _ => unreachable!(), + } + } else { + None + } + } } } else if is_instance::(&obj, vm) { // if Untyped then by default return types with highest precision @@ -906,9 +934,10 @@ pub fn val_to_pyobj(val: value::Value, vm: &VirtualMachine) -> PyObjectRef { value::Value::String(s) => vm.ctx.new_str(s.as_utf8()).into(), // is this copy necessary? value::Value::Binary(b) => vm.ctx.new_bytes(b.deref().to_vec()).into(), - // is `Date` and `DateTime` supported yet? For now just ad hoc into PyInt + // TODO(dennis):is `Date` and `DateTime` supported yet? For now just ad hoc into PyInt, but it's better to be cast into python Date, DateTime objects etc.. value::Value::Date(v) => vm.ctx.new_int(v.val()).into(), value::Value::DateTime(v) => vm.ctx.new_int(v.val()).into(), + // FIXME(dennis): lose the timestamp unit here Value::Timestamp(v) => vm.ctx.new_int(v.value()).into(), value::Value::List(_) => unreachable!(), }