mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 21:32:58 +00:00
feat: improve python coprocesssor parser (#260)
* feat: supports DateTime, Date and Timestamp column type to be returned by py scripts * feat: improve coprocessor compiler, make it work better * fix: comments * fix: typo * Update src/script/src/python/vector.rs Co-authored-by: LFC <bayinamine@gmail.com> * Update src/script/src/python/coprocessor.rs Co-authored-by: LFC <bayinamine@gmail.com> Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
pub mod compile;
|
||||||
pub mod parse;
|
pub mod parse;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -14,12 +15,6 @@ use datatypes::schema::Schema;
|
|||||||
use datatypes::vectors::Helper;
|
use datatypes::vectors::Helper;
|
||||||
use datatypes::vectors::{BooleanVector, Vector, VectorRef};
|
use datatypes::vectors::{BooleanVector, Vector, VectorRef};
|
||||||
use rustpython_bytecode::CodeObject;
|
use rustpython_bytecode::CodeObject;
|
||||||
use rustpython_compiler_core::compile;
|
|
||||||
use rustpython_parser::{
|
|
||||||
ast,
|
|
||||||
ast::{Located, Location},
|
|
||||||
parser,
|
|
||||||
};
|
|
||||||
use rustpython_vm as vm;
|
use rustpython_vm as vm;
|
||||||
use rustpython_vm::{class::PyClassImpl, AsObject};
|
use rustpython_vm::{class::PyClassImpl, AsObject};
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -29,12 +24,10 @@ use vm::builtins::{PyBaseExceptionRef, PyTuple};
|
|||||||
use vm::scope::Scope;
|
use vm::scope::Scope;
|
||||||
use vm::{Interpreter, PyObjectRef, VirtualMachine};
|
use vm::{Interpreter, PyObjectRef, VirtualMachine};
|
||||||
|
|
||||||
use crate::fail_parse_error;
|
|
||||||
use crate::python::builtins::greptime_builtin;
|
use crate::python::builtins::greptime_builtin;
|
||||||
use crate::python::coprocessor::parse::{ret_parse_error, DecoratorArgs};
|
use crate::python::coprocessor::parse::DecoratorArgs;
|
||||||
use crate::python::error::{
|
use crate::python::error::{
|
||||||
ensure, ret_other_error_with, ArrowSnafu, CoprParseSnafu, OtherSnafu, PyCompileSnafu,
|
ensure, ret_other_error_with, ArrowSnafu, OtherSnafu, Result, TypeCastSnafu,
|
||||||
PyParseSnafu, Result, TypeCastSnafu,
|
|
||||||
};
|
};
|
||||||
use crate::python::utils::{format_py_error, py_vec_obj_to_array};
|
use crate::python::utils::{format_py_error, py_vec_obj_to_array};
|
||||||
use crate::python::{utils::is_instance, PyVector};
|
use crate::python::{utils::is_instance, PyVector};
|
||||||
@@ -50,7 +43,7 @@ pub struct AnnotationInfo {
|
|||||||
pub type CoprocessorRef = Arc<Coprocessor>;
|
pub type CoprocessorRef = Arc<Coprocessor>;
|
||||||
|
|
||||||
#[cfg_attr(test, derive(Deserialize))]
|
#[cfg_attr(test, derive(Deserialize))]
|
||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Coprocessor {
|
pub struct Coprocessor {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub deco_args: DecoratorArgs,
|
pub deco_args: DecoratorArgs,
|
||||||
@@ -61,141 +54,26 @@ pub struct Coprocessor {
|
|||||||
/// store its corresponding script, also skip serde when in `cfg(test)` to reduce work in compare
|
/// store its corresponding script, also skip serde when in `cfg(test)` to reduce work in compare
|
||||||
#[cfg_attr(test, serde(skip))]
|
#[cfg_attr(test, serde(skip))]
|
||||||
pub script: String,
|
pub script: String,
|
||||||
|
// We must use option here, because we use `serde` to deserialize coprocessor
|
||||||
|
// from ron file and `Deserialize` requires Coprocessor implementing `Default` trait,
|
||||||
|
// but CodeObject doesn't.
|
||||||
|
#[cfg_attr(test, serde(skip))]
|
||||||
|
pub code_obj: Option<CodeObject>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PartialEq for Coprocessor {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.name == other.name
|
||||||
|
&& self.deco_args == other.deco_args
|
||||||
|
&& self.arg_types == other.arg_types
|
||||||
|
&& self.return_types == other.return_types
|
||||||
|
&& self.script == other.script
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eq for Coprocessor {}
|
||||||
|
|
||||||
impl Coprocessor {
|
impl Coprocessor {
|
||||||
/// generate a call to the coprocessor function
|
|
||||||
/// with arguments given in decorator's `args` list
|
|
||||||
/// also set in location in source code to `loc`
|
|
||||||
fn gen_call(&self, loc: &Location) -> ast::Stmt<()> {
|
|
||||||
let mut loc = loc.to_owned();
|
|
||||||
// adding a line to avoid confusing if any error occurs when calling the function
|
|
||||||
// then the pretty print will point to the last line in code
|
|
||||||
// instead of point to any of existing code written by user.
|
|
||||||
loc.newline();
|
|
||||||
let args: Vec<Located<ast::ExprKind>> = self
|
|
||||||
.deco_args
|
|
||||||
.arg_names
|
|
||||||
.iter()
|
|
||||||
.map(|v| {
|
|
||||||
let node = ast::ExprKind::Name {
|
|
||||||
id: v.to_owned(),
|
|
||||||
ctx: ast::ExprContext::Load,
|
|
||||||
};
|
|
||||||
create_located(node, loc)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
let func = ast::ExprKind::Call {
|
|
||||||
func: Box::new(create_located(
|
|
||||||
ast::ExprKind::Name {
|
|
||||||
id: self.name.to_owned(),
|
|
||||||
ctx: ast::ExprContext::Load,
|
|
||||||
},
|
|
||||||
loc,
|
|
||||||
)),
|
|
||||||
args,
|
|
||||||
keywords: Vec::new(),
|
|
||||||
};
|
|
||||||
let stmt = ast::StmtKind::Expr {
|
|
||||||
value: Box::new(create_located(func, loc)),
|
|
||||||
};
|
|
||||||
create_located(stmt, loc)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// check if `Mod` is of one line of statement
|
|
||||||
fn check_before_compile(top: &ast::Mod) -> Result<()> {
|
|
||||||
if let ast::Mod::Interactive { body: code } = top {
|
|
||||||
ensure!(
|
|
||||||
code.len() == 1,
|
|
||||||
CoprParseSnafu {
|
|
||||||
reason: format!(
|
|
||||||
"Expect only one statement in script, found {} statement",
|
|
||||||
code.len()
|
|
||||||
),
|
|
||||||
loc: code.first().map(|s| s.location)
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
if let ast::StmtKind::FunctionDef {
|
|
||||||
name: _,
|
|
||||||
args: _,
|
|
||||||
body: _,
|
|
||||||
decorator_list: _,
|
|
||||||
returns: _,
|
|
||||||
type_comment: __main__,
|
|
||||||
} = &code[0].node
|
|
||||||
{
|
|
||||||
} else {
|
|
||||||
return fail_parse_error!(
|
|
||||||
format!("Expect the one and only statement in script as a function def, but instead found: {:?}", code[0].node),
|
|
||||||
Some(code[0].location)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return fail_parse_error!(
|
|
||||||
format!("Expect statement in script, found: {:?}", top),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// stripe the decorator(`@xxxx`) and type annotation(for type checker is done in rust function), add one line in the ast for call function with given parameter, and compiler into `CodeObject`
|
|
||||||
///
|
|
||||||
/// The rationale is that rustpython's vm is not very efficient according to [offical benchmark](https://rustpython.github.io/benchmarks),
|
|
||||||
/// So we should avoid running too much Python Bytecode, hence in this function we delete `@` decorator(instead of actually write a decorator in python)
|
|
||||||
/// And add a function call in the end and also
|
|
||||||
/// strip type annotation
|
|
||||||
fn strip_append_and_compile(&self) -> Result<CodeObject> {
|
|
||||||
let script = &self.script;
|
|
||||||
// note that it's important to use `parser::Mode::Interactive` so the ast can be compile to return a result instead of return None in eval mode
|
|
||||||
let mut top = parser::parse(script, parser::Mode::Interactive).context(PyParseSnafu)?;
|
|
||||||
Self::check_before_compile(&top)?;
|
|
||||||
// erase decorator
|
|
||||||
if let ast::Mod::Interactive { body } = &mut top {
|
|
||||||
let code = body;
|
|
||||||
if let ast::StmtKind::FunctionDef {
|
|
||||||
name: _,
|
|
||||||
args,
|
|
||||||
body: _,
|
|
||||||
decorator_list,
|
|
||||||
returns,
|
|
||||||
type_comment: __main__,
|
|
||||||
} = &mut code[0].node
|
|
||||||
{
|
|
||||||
*decorator_list = Vec::new();
|
|
||||||
// strip type annotation
|
|
||||||
// def a(b: int, c:int) -> int
|
|
||||||
// will became
|
|
||||||
// def a(b, c)
|
|
||||||
*returns = None;
|
|
||||||
for arg in &mut args.args {
|
|
||||||
arg.node.annotation = None;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// already done in check function
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
let loc = code[0].location;
|
|
||||||
|
|
||||||
// This manually construct ast has no corrsponding code
|
|
||||||
// in the script, so just give it a location that don't exist in orginal script
|
|
||||||
// (which doesn't matter because Location usually only used in pretty print errors)
|
|
||||||
code.push(self.gen_call(&loc));
|
|
||||||
} else {
|
|
||||||
// already done in check function
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
// use `compile::Mode::BlockExpr` so it return the result of statement
|
|
||||||
compile::compile_top(
|
|
||||||
&top,
|
|
||||||
"<embedded>".to_owned(),
|
|
||||||
compile::Mode::BlockExpr,
|
|
||||||
compile::CompileOpts { optimize: 0 },
|
|
||||||
)
|
|
||||||
.context(PyCompileSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// generate [`Schema`] according to return names, types,
|
/// generate [`Schema`] according to return names, types,
|
||||||
/// if no annotation
|
/// if no annotation
|
||||||
/// the datatypes of the actual columns is used directly
|
/// the datatypes of the actual columns is used directly
|
||||||
@@ -286,10 +164,6 @@ impl Coprocessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_located<T>(node: T, loc: Location) -> Located<T> {
|
|
||||||
Located::new(loc, node)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// cast a `dyn Array` of type unsigned/int/float into a `dyn Vector`
|
/// cast a `dyn Array` of type unsigned/int/float into a `dyn Vector`
|
||||||
fn try_into_vector<T: datatypes::types::Primitive>(arg: Arc<dyn Array>) -> Result<Arc<dyn Vector>> {
|
fn try_into_vector<T: datatypes::types::Primitive>(arg: Arc<dyn Array>) -> Result<Arc<dyn Vector>> {
|
||||||
// wrap try_into_vector in here to convert `datatypes::error::Error` to `python::error::Error`
|
// wrap try_into_vector in here to convert `datatypes::error::Error` to `python::error::Error`
|
||||||
@@ -483,7 +357,7 @@ pub fn exec_coprocessor(script: &str, rb: &DfRecordBatch) -> Result<RecordBatch>
|
|||||||
// 1. parse the script and check if it's only a function with `@coprocessor` decorator, and get `args` and `returns`,
|
// 1. parse the script and check if it's only a function with `@coprocessor` decorator, and get `args` and `returns`,
|
||||||
// 2. also check for exist of `args` in `rb`, if not found, return error
|
// 2. also check for exist of `args` in `rb`, if not found, return error
|
||||||
// TODO(discord9): cache the result of parse_copr
|
// TODO(discord9): cache the result of parse_copr
|
||||||
let copr = parse::parse_copr(script)?;
|
let copr = parse::parse_and_compile_copr(script)?;
|
||||||
exec_parsed(&copr, rb)
|
exec_parsed(&copr, rb)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -499,8 +373,8 @@ pub(crate) fn exec_with_cached_vm(
|
|||||||
let scope = vm.new_scope_with_builtins();
|
let scope = vm.new_scope_with_builtins();
|
||||||
set_items_in_scope(&scope, vm, &copr.deco_args.arg_names, args)?;
|
set_items_in_scope(&scope, vm, &copr.deco_args.arg_names, args)?;
|
||||||
|
|
||||||
let code_obj = copr.strip_append_and_compile()?;
|
// It's safe to unwrap code_object, it's already compiled before.
|
||||||
let code_obj = vm.ctx.new_code(code_obj);
|
let code_obj = vm.ctx.new_code(copr.code_obj.clone().unwrap());
|
||||||
let ret = vm
|
let ret = vm
|
||||||
.run_code_obj(code_obj, scope)
|
.run_code_obj(code_obj, scope)
|
||||||
.map_err(|e| format_py_error(e, vm))?;
|
.map_err(|e| format_py_error(e, vm))?;
|
||||||
@@ -567,3 +441,35 @@ pub fn exec_copr_print(
|
|||||||
crate::python::error::pretty_print_error_in_src(script, &e, ln_offset, filename)
|
crate::python::error::pretty_print_error_in_src(script, &e, ln_offset, filename)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::python::coprocessor::parse::parse_and_compile_copr;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_copr() {
|
||||||
|
let script = r#"
|
||||||
|
def add(a, b):
|
||||||
|
return a + b
|
||||||
|
|
||||||
|
@copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100")
|
||||||
|
def test(a, b, c):
|
||||||
|
import greptime as g
|
||||||
|
return add(a, b) / g.sqrt(c)
|
||||||
|
"#;
|
||||||
|
|
||||||
|
let copr = parse_and_compile_copr(script).unwrap();
|
||||||
|
assert_eq!(copr.name, "test");
|
||||||
|
let deco_args = copr.deco_args.clone();
|
||||||
|
assert_eq!(
|
||||||
|
deco_args.sql.unwrap(),
|
||||||
|
"select number as a,number as b,number as c from numbers limit 100"
|
||||||
|
);
|
||||||
|
assert_eq!(deco_args.ret_names, vec!["r"]);
|
||||||
|
assert_eq!(deco_args.arg_names, vec!["a", "b", "c"]);
|
||||||
|
assert_eq!(copr.arg_types, vec![None, None, None]);
|
||||||
|
assert_eq!(copr.return_types, vec![None]);
|
||||||
|
assert_eq!(copr.script, script);
|
||||||
|
assert!(copr.code_obj.is_some());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
121
src/script/src/python/coprocessor/compile.rs
Normal file
121
src/script/src/python/coprocessor/compile.rs
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
//! compile script to code object
|
||||||
|
|
||||||
|
use rustpython_bytecode::CodeObject;
|
||||||
|
use rustpython_compiler_core::compile as python_compile;
|
||||||
|
use rustpython_parser::{
|
||||||
|
ast,
|
||||||
|
ast::{Located, Location},
|
||||||
|
parser,
|
||||||
|
};
|
||||||
|
use snafu::ResultExt;
|
||||||
|
|
||||||
|
use crate::fail_parse_error;
|
||||||
|
use crate::python::coprocessor::parse::{ret_parse_error, DecoratorArgs};
|
||||||
|
use crate::python::error::{PyCompileSnafu, PyParseSnafu, Result};
|
||||||
|
|
||||||
|
fn create_located<T>(node: T, loc: Location) -> Located<T> {
|
||||||
|
Located::new(loc, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// generate a call to the coprocessor function
|
||||||
|
/// with arguments given in decorator's `args` list
|
||||||
|
/// also set in location in source code to `loc`
|
||||||
|
fn gen_call(name: &str, deco_args: &DecoratorArgs, loc: &Location) -> ast::Stmt<()> {
|
||||||
|
let mut loc = loc.to_owned();
|
||||||
|
// adding a line to avoid confusing if any error occurs when calling the function
|
||||||
|
// then the pretty print will point to the last line in code
|
||||||
|
// instead of point to any of existing code written by user.
|
||||||
|
loc.newline();
|
||||||
|
let args: Vec<Located<ast::ExprKind>> = deco_args
|
||||||
|
.arg_names
|
||||||
|
.iter()
|
||||||
|
.map(|v| {
|
||||||
|
let node = ast::ExprKind::Name {
|
||||||
|
id: v.to_owned(),
|
||||||
|
ctx: ast::ExprContext::Load,
|
||||||
|
};
|
||||||
|
create_located(node, loc)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let func = ast::ExprKind::Call {
|
||||||
|
func: Box::new(create_located(
|
||||||
|
ast::ExprKind::Name {
|
||||||
|
id: name.to_string(),
|
||||||
|
ctx: ast::ExprContext::Load,
|
||||||
|
},
|
||||||
|
loc,
|
||||||
|
)),
|
||||||
|
args,
|
||||||
|
keywords: Vec::new(),
|
||||||
|
};
|
||||||
|
let stmt = ast::StmtKind::Expr {
|
||||||
|
value: Box::new(create_located(func, loc)),
|
||||||
|
};
|
||||||
|
create_located(stmt, loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// stripe the decorator(`@xxxx`) and type annotation(for type checker is done in rust function), add one line in the ast for call function with given parameter, and compiler into `CodeObject`
|
||||||
|
///
|
||||||
|
/// The rationale is that rustpython's vm is not very efficient according to [offical benchmark](https://rustpython.github.io/benchmarks),
|
||||||
|
/// So we should avoid running too much Python Bytecode, hence in this function we delete `@` decorator(instead of actually write a decorator in python)
|
||||||
|
/// And add a function call in the end and also
|
||||||
|
/// strip type annotation
|
||||||
|
pub fn compile_script(name: &str, deco_args: &DecoratorArgs, script: &str) -> Result<CodeObject> {
|
||||||
|
// note that it's important to use `parser::Mode::Interactive` so the ast can be compile to return a result instead of return None in eval mode
|
||||||
|
let mut top = parser::parse(script, parser::Mode::Interactive).context(PyParseSnafu)?;
|
||||||
|
// erase decorator
|
||||||
|
if let ast::Mod::Interactive { body } = &mut top {
|
||||||
|
let stmts = body;
|
||||||
|
let mut loc = None;
|
||||||
|
for stmt in stmts.iter_mut() {
|
||||||
|
if let ast::StmtKind::FunctionDef {
|
||||||
|
name: _,
|
||||||
|
args,
|
||||||
|
body: _,
|
||||||
|
decorator_list,
|
||||||
|
returns,
|
||||||
|
type_comment: __main__,
|
||||||
|
} = &mut stmt.node
|
||||||
|
{
|
||||||
|
*decorator_list = Vec::new();
|
||||||
|
// strip type annotation
|
||||||
|
// def a(b: int, c:int) -> int
|
||||||
|
// will became
|
||||||
|
// def a(b, c)
|
||||||
|
*returns = None;
|
||||||
|
for arg in &mut args.args {
|
||||||
|
arg.node.annotation = None;
|
||||||
|
}
|
||||||
|
} else if matches!(
|
||||||
|
stmt.node,
|
||||||
|
ast::StmtKind::Import { .. } | ast::StmtKind::ImportFrom { .. }
|
||||||
|
) {
|
||||||
|
// import statements are allowed.
|
||||||
|
} else {
|
||||||
|
// already checked in parser
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
loc = Some(stmt.location);
|
||||||
|
|
||||||
|
// This manually construct ast has no corrsponding code
|
||||||
|
// in the script, so just give it a location that don't exist in orginal script
|
||||||
|
// (which doesn't matter because Location usually only used in pretty print errors)
|
||||||
|
}
|
||||||
|
// Append statement which calling coprocessor function.
|
||||||
|
// It's safe to unwrap loc, it is always exists.
|
||||||
|
stmts.push(gen_call(name, deco_args, &loc.unwrap()));
|
||||||
|
} else {
|
||||||
|
return fail_parse_error!(
|
||||||
|
format!("Expect statement in script, found: {:?}", top),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// use `compile::Mode::BlockExpr` so it return the result of statement
|
||||||
|
python_compile::compile_top(
|
||||||
|
&top,
|
||||||
|
"<embedded>".to_owned(),
|
||||||
|
python_compile::Mode::BlockExpr,
|
||||||
|
python_compile::CompileOpts { optimize: 0 },
|
||||||
|
)
|
||||||
|
.context(PyCompileSnafu)
|
||||||
|
}
|
||||||
@@ -8,8 +8,9 @@ use rustpython_parser::{
|
|||||||
};
|
};
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use snafu::ResultExt;
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
|
||||||
|
use crate::python::coprocessor::compile;
|
||||||
use crate::python::coprocessor::AnnotationInfo;
|
use crate::python::coprocessor::AnnotationInfo;
|
||||||
use crate::python::coprocessor::Coprocessor;
|
use crate::python::coprocessor::Coprocessor;
|
||||||
use crate::python::error::{ensure, CoprParseSnafu, PyParseSnafu, Result};
|
use crate::python::error::{ensure, CoprParseSnafu, PyParseSnafu, Result};
|
||||||
@@ -415,107 +416,104 @@ fn get_return_annotations(rets: &ast::Expr<()>) -> Result<Vec<Option<AnnotationI
|
|||||||
Ok(return_types)
|
Ok(return_types)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// check if the list of statements contain only one statement and
|
|
||||||
/// that statement is a function call with one decorator
|
|
||||||
fn check_copr(stmts: &Vec<ast::Stmt<()>>) -> Result<()> {
|
|
||||||
ensure!(
|
|
||||||
stmts.len() == 1,
|
|
||||||
CoprParseSnafu {
|
|
||||||
reason:
|
|
||||||
"Expect one and only one python function with `@coprocessor` or `@cpor` decorator"
|
|
||||||
.to_string(),
|
|
||||||
loc: stmts.first().map(|s| s.location)
|
|
||||||
}
|
|
||||||
);
|
|
||||||
if let ast::StmtKind::FunctionDef {
|
|
||||||
name: _,
|
|
||||||
args: _,
|
|
||||||
body: _,
|
|
||||||
decorator_list,
|
|
||||||
returns: _,
|
|
||||||
type_comment: _,
|
|
||||||
} = &stmts[0].node
|
|
||||||
{
|
|
||||||
ensure!(
|
|
||||||
decorator_list.len() == 1,
|
|
||||||
CoprParseSnafu {
|
|
||||||
reason: "Expect one decorator",
|
|
||||||
loc: decorator_list.first().map(|s| s.location)
|
|
||||||
}
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
return fail_parse_error!(
|
|
||||||
format!(
|
|
||||||
"Expect a function definition, found a \n{:#?}",
|
|
||||||
&stmts[0].node
|
|
||||||
),
|
|
||||||
Some(stmts[0].location),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// parse script and return `Coprocessor` struct with info extract from ast
|
/// parse script and return `Coprocessor` struct with info extract from ast
|
||||||
pub fn parse_copr(script: &str) -> Result<Coprocessor> {
|
pub fn parse_and_compile_copr(script: &str) -> Result<Coprocessor> {
|
||||||
let python_ast = parser::parse_program(script).context(PyParseSnafu)?;
|
let python_ast = parser::parse_program(script).context(PyParseSnafu)?;
|
||||||
check_copr(&python_ast)?;
|
|
||||||
if let ast::StmtKind::FunctionDef {
|
|
||||||
name,
|
|
||||||
args: fn_args,
|
|
||||||
body: _,
|
|
||||||
decorator_list,
|
|
||||||
returns,
|
|
||||||
type_comment: _,
|
|
||||||
} = &python_ast[0].node
|
|
||||||
{
|
|
||||||
let decorator = &decorator_list[0];
|
|
||||||
let deco_args = parse_decorator(decorator)?;
|
|
||||||
|
|
||||||
// get arg types from type annotation
|
let mut coprocessor = None;
|
||||||
let arg_types = get_arg_annotations(fn_args)?;
|
|
||||||
|
|
||||||
// get return types from type annotation
|
for stmt in python_ast {
|
||||||
let return_types = if let Some(rets) = returns {
|
if let ast::StmtKind::FunctionDef {
|
||||||
get_return_annotations(rets)?
|
name,
|
||||||
|
args: fn_args,
|
||||||
|
body: _,
|
||||||
|
decorator_list,
|
||||||
|
returns,
|
||||||
|
type_comment: _,
|
||||||
|
} = &stmt.node
|
||||||
|
{
|
||||||
|
if !decorator_list.is_empty() {
|
||||||
|
ensure!(coprocessor.is_none(),
|
||||||
|
CoprParseSnafu {
|
||||||
|
reason: "Expect one and only one python function with `@coprocessor` or `@cpor` decorator",
|
||||||
|
loc: stmt.location,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
ensure!(
|
||||||
|
decorator_list.len() == 1,
|
||||||
|
CoprParseSnafu {
|
||||||
|
reason: "Expect one decorator",
|
||||||
|
loc: decorator_list.first().map(|s| s.location)
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let decorator = &decorator_list[0];
|
||||||
|
let deco_args = parse_decorator(decorator)?;
|
||||||
|
|
||||||
|
// get arg types from type annotation
|
||||||
|
let arg_types = get_arg_annotations(fn_args)?;
|
||||||
|
|
||||||
|
// get return types from type annotation
|
||||||
|
let return_types = if let Some(rets) = returns {
|
||||||
|
get_return_annotations(rets)?
|
||||||
|
} else {
|
||||||
|
// if no anntation at all, set it to all None
|
||||||
|
std::iter::repeat(None)
|
||||||
|
.take(deco_args.ret_names.len())
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
// make sure both arguments&returns in fucntion
|
||||||
|
// and in decorator have same length
|
||||||
|
ensure!(
|
||||||
|
deco_args.arg_names.len() == arg_types.len(),
|
||||||
|
CoprParseSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"args number in decorator({}) and function({}) doesn't match",
|
||||||
|
deco_args.arg_names.len(),
|
||||||
|
arg_types.len()
|
||||||
|
),
|
||||||
|
loc: None
|
||||||
|
}
|
||||||
|
);
|
||||||
|
ensure!(
|
||||||
|
deco_args.ret_names.len() == return_types.len(),
|
||||||
|
CoprParseSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"returns number in decorator( {} ) and function annotation( {} ) doesn't match",
|
||||||
|
deco_args.ret_names.len(),
|
||||||
|
return_types.len()
|
||||||
|
),
|
||||||
|
loc: None
|
||||||
|
}
|
||||||
|
);
|
||||||
|
coprocessor = Some(Coprocessor {
|
||||||
|
code_obj: Some(compile::compile_script(name, &deco_args, script)?),
|
||||||
|
name: name.to_string(),
|
||||||
|
deco_args,
|
||||||
|
arg_types,
|
||||||
|
return_types,
|
||||||
|
script: script.to_owned(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else if matches!(
|
||||||
|
stmt.node,
|
||||||
|
ast::StmtKind::Import { .. } | ast::StmtKind::ImportFrom { .. }
|
||||||
|
) {
|
||||||
|
// import statements are allowed.
|
||||||
} else {
|
} else {
|
||||||
// if no anntation at all, set it to all None
|
return fail_parse_error!(
|
||||||
std::iter::repeat(None)
|
format!(
|
||||||
.take(deco_args.ret_names.len())
|
"Expect a function definition, but found a \n{:#?}",
|
||||||
.collect()
|
&stmt.node
|
||||||
};
|
|
||||||
|
|
||||||
// make sure both arguments&returns in fucntion
|
|
||||||
// and in decorator have same length
|
|
||||||
ensure!(
|
|
||||||
deco_args.arg_names.len() == arg_types.len(),
|
|
||||||
CoprParseSnafu {
|
|
||||||
reason: format!(
|
|
||||||
"args number in decorator({}) and function({}) doesn't match",
|
|
||||||
deco_args.arg_names.len(),
|
|
||||||
arg_types.len()
|
|
||||||
),
|
),
|
||||||
loc: None
|
Some(stmt.location),
|
||||||
}
|
);
|
||||||
);
|
}
|
||||||
ensure!(
|
|
||||||
deco_args.ret_names.len() == return_types.len(),
|
|
||||||
CoprParseSnafu {
|
|
||||||
reason: format!(
|
|
||||||
"returns number in decorator( {} ) and function annotation( {} ) doesn't match",
|
|
||||||
deco_args.ret_names.len(),
|
|
||||||
return_types.len()
|
|
||||||
),
|
|
||||||
loc: None
|
|
||||||
}
|
|
||||||
);
|
|
||||||
Ok(Coprocessor {
|
|
||||||
name: name.to_string(),
|
|
||||||
deco_args,
|
|
||||||
arg_types,
|
|
||||||
return_types,
|
|
||||||
script: script.to_owned(),
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
unreachable!()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
coprocessor.context(CoprParseSnafu {
|
||||||
|
reason: "Coprocessor not found in script",
|
||||||
|
loc: None,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use snafu::{ensure, ResultExt};
|
|||||||
use sql::statements::statement::Statement;
|
use sql::statements::statement::Statement;
|
||||||
|
|
||||||
use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine};
|
use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine};
|
||||||
use crate::python::coprocessor::{exec_parsed, parse::parse_copr};
|
use crate::python::coprocessor::{exec_parsed, parse};
|
||||||
use crate::python::{
|
use crate::python::{
|
||||||
coprocessor::CoprocessorRef,
|
coprocessor::CoprocessorRef,
|
||||||
error::{self, Result},
|
error::{self, Result},
|
||||||
@@ -122,7 +122,7 @@ impl ScriptEngine for PyEngine {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn compile(&self, script: &str, _ctx: CompileContext) -> Result<PyScript> {
|
async fn compile(&self, script: &str, _ctx: CompileContext) -> Result<PyScript> {
|
||||||
let copr = Arc::new(parse_copr(script)?);
|
let copr = Arc::new(parse::parse_and_compile_copr(script)?);
|
||||||
|
|
||||||
Ok(PyScript {
|
Ok(PyScript {
|
||||||
copr,
|
copr,
|
||||||
@@ -165,10 +165,13 @@ mod tests {
|
|||||||
let script_engine = PyEngine::new(query_engine.clone());
|
let script_engine = PyEngine::new(query_engine.clone());
|
||||||
|
|
||||||
let script = r#"
|
let script = r#"
|
||||||
|
import greptime as g
|
||||||
|
def add(a, b):
|
||||||
|
return a + b;
|
||||||
|
|
||||||
@copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100")
|
@copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100")
|
||||||
def test(a, b, c):
|
def test(a, b, c):
|
||||||
import greptime as g
|
return add(a, b) / g.sqrt(c)
|
||||||
return (a + b) / g.sqrt(c)
|
|
||||||
"#;
|
"#;
|
||||||
let script = script_engine
|
let script = script_engine
|
||||||
.compile(script, CompileContext::default())
|
.compile(script, CompileContext::default())
|
||||||
@@ -196,9 +199,10 @@ def test(a, b, c):
|
|||||||
|
|
||||||
// test list comprehension
|
// test list comprehension
|
||||||
let script = r#"
|
let script = r#"
|
||||||
|
import greptime as gt
|
||||||
|
|
||||||
@copr(args=["number"], returns = ["r"], sql="select number from numbers limit 100")
|
@copr(args=["number"], returns = ["r"], sql="select number from numbers limit 100")
|
||||||
def test(a):
|
def test(a):
|
||||||
import greptime as gt
|
|
||||||
return gt.vector([x for x in a if x % 2 == 0])
|
return gt.vector([x for x in a if x % 2 == 0])
|
||||||
"#;
|
"#;
|
||||||
let script = script_engine
|
let script = script_engine
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use super::error::{get_error_reason_loc, visualize_loc};
|
|||||||
use crate::python::coprocessor::AnnotationInfo;
|
use crate::python::coprocessor::AnnotationInfo;
|
||||||
use crate::python::error::pretty_print_error_in_src;
|
use crate::python::error::pretty_print_error_in_src;
|
||||||
use crate::python::{
|
use crate::python::{
|
||||||
coprocessor, coprocessor::parse::parse_copr, coprocessor::Coprocessor, error::Error,
|
coprocessor, coprocessor::parse::parse_and_compile_copr, coprocessor::Coprocessor, error::Error,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
@@ -31,7 +31,7 @@ struct TestCase {
|
|||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
enum Predicate {
|
enum Predicate {
|
||||||
ParseIsOk {
|
ParseIsOk {
|
||||||
result: Coprocessor,
|
result: Box<Coprocessor>,
|
||||||
},
|
},
|
||||||
ParseIsErr {
|
ParseIsErr {
|
||||||
/// used to check if after serialize [`Error`] into a String, that string contains `reason`
|
/// used to check if after serialize [`Error`] into a String, that string contains `reason`
|
||||||
@@ -81,13 +81,13 @@ fn run_ron_testcases() {
|
|||||||
print!(".ron test {}", testcase.name);
|
print!(".ron test {}", testcase.name);
|
||||||
match testcase.predicate {
|
match testcase.predicate {
|
||||||
Predicate::ParseIsOk { result } => {
|
Predicate::ParseIsOk { result } => {
|
||||||
let copr = parse_copr(&testcase.code);
|
let copr = parse_and_compile_copr(&testcase.code);
|
||||||
let mut copr = copr.unwrap();
|
let mut copr = copr.unwrap();
|
||||||
copr.script = "".into();
|
copr.script = "".into();
|
||||||
assert_eq!(copr, result);
|
assert_eq!(copr, *result);
|
||||||
}
|
}
|
||||||
Predicate::ParseIsErr { reason } => {
|
Predicate::ParseIsErr { reason } => {
|
||||||
let copr = parse_copr(&testcase.code);
|
let copr = parse_and_compile_copr(&testcase.code);
|
||||||
if copr.is_ok() {
|
if copr.is_ok() {
|
||||||
eprintln!("Expect to be err, found{copr:#?}");
|
eprintln!("Expect to be err, found{copr:#?}");
|
||||||
panic!()
|
panic!()
|
||||||
@@ -180,7 +180,7 @@ def a(cpu, mem: vector[f64])->(vector[f64|None], vector[f64], vector[_], vector[
|
|||||||
return cpu + mem, cpu - mem, cpu * mem, cpu / mem
|
return cpu + mem, cpu - mem, cpu * mem, cpu / mem
|
||||||
"#;
|
"#;
|
||||||
let pyast = parser::parse(python_source, parser::Mode::Interactive).unwrap();
|
let pyast = parser::parse(python_source, parser::Mode::Interactive).unwrap();
|
||||||
let copr = parse_copr(python_source);
|
let copr = parse_and_compile_copr(python_source);
|
||||||
dbg!(copr);
|
dbg!(copr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,9 +5,15 @@
|
|||||||
(
|
(
|
||||||
name: "correct_parse",
|
name: "correct_parse",
|
||||||
code: r#"
|
code: r#"
|
||||||
|
import greptime as gt
|
||||||
|
from greptime import pow
|
||||||
|
def add(a, b):
|
||||||
|
return a + b
|
||||||
|
def sub(a, b):
|
||||||
|
return a - b
|
||||||
@copr(args=["cpu", "mem"], returns=["perf", "what", "how", "why"])
|
@copr(args=["cpu", "mem"], returns=["perf", "what", "how", "why"])
|
||||||
def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64|None], vector[_], vector[_ | None]):
|
def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64|None], vector[_], vector[_ | None]):
|
||||||
return cpu + mem, cpu - mem, cpu * mem, cpu / mem
|
return add(cpu, mem), sub(cpu, mem), cpu * mem, cpu / mem
|
||||||
"#,
|
"#,
|
||||||
predicate: ParseIsOk(
|
predicate: ParseIsOk(
|
||||||
result: (
|
result: (
|
||||||
@@ -54,7 +60,21 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64|None], vecto
|
|||||||
return cpu + mem, cpu - mem, cpu * mem, cpu / mem
|
return cpu + mem, cpu - mem, cpu * mem, cpu / mem
|
||||||
"#,
|
"#,
|
||||||
predicate: ParseIsErr(
|
predicate: ParseIsErr(
|
||||||
reason: "Expect one decorator"
|
reason: "Coprocessor not found in script"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
(
|
||||||
|
name: "too_many_decorators",
|
||||||
|
code: r#"
|
||||||
|
@copr(args=["a"], returns=["r"])
|
||||||
|
def test1(a):
|
||||||
|
return a;
|
||||||
|
@copr(args=["a"], returns=["r"])
|
||||||
|
def test2(a):
|
||||||
|
return a;
|
||||||
|
"#,
|
||||||
|
predicate: ParseIsErr(
|
||||||
|
reason: "Expect one and only one python function with `@coprocessor` or `@cpor` decorator"
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
@@ -174,7 +194,7 @@ def a(cpu: vector[f64], mem: vector[f64])->(vector[None|None], vector[into(f64)]
|
|||||||
"#,
|
"#,
|
||||||
predicate: ParseIsErr(
|
predicate: ParseIsErr(
|
||||||
reason:
|
reason:
|
||||||
"Expect one and only one python function with `@coprocessor` or `@cpor` decorator"
|
"Expect a function definition, but found a"
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
@@ -407,7 +427,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64]):
|
|||||||
42
|
42
|
||||||
"#,
|
"#,
|
||||||
predicate: ParseIsErr(
|
predicate: ParseIsErr(
|
||||||
reason: "Expect a function definition, found a"
|
reason: "Expect a function definition, but found a"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
@@ -1,6 +1,9 @@
|
|||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use common_time::date::Date;
|
||||||
|
use common_time::datetime::DateTime;
|
||||||
|
use common_time::timestamp::Timestamp;
|
||||||
use datatypes::arrow;
|
use datatypes::arrow;
|
||||||
use datatypes::arrow::array::BooleanArray;
|
use datatypes::arrow::array::BooleanArray;
|
||||||
use datatypes::arrow::compute;
|
use datatypes::arrow::compute;
|
||||||
@@ -853,7 +856,32 @@ pub fn pyobj_try_to_typed_val(
|
|||||||
ConcreteDataType::List(_) => unreachable!(),
|
ConcreteDataType::List(_) => unreachable!(),
|
||||||
ConcreteDataType::Date(_)
|
ConcreteDataType::Date(_)
|
||||||
| ConcreteDataType::DateTime(_)
|
| ConcreteDataType::DateTime(_)
|
||||||
| ConcreteDataType::Timestamp(_) => todo!(),
|
| ConcreteDataType::Timestamp(_) => {
|
||||||
|
if is_instance::<PyInt>(&obj, vm) {
|
||||||
|
match dtype {
|
||||||
|
ConcreteDataType::Date(_) => obj
|
||||||
|
.try_into_value::<i32>(vm)
|
||||||
|
.ok()
|
||||||
|
.map(Date::new)
|
||||||
|
.map(value::Value::Date),
|
||||||
|
ConcreteDataType::DateTime(_) => obj
|
||||||
|
.try_into_value::<i64>(vm)
|
||||||
|
.ok()
|
||||||
|
.map(DateTime::new)
|
||||||
|
.map(value::Value::DateTime),
|
||||||
|
ConcreteDataType::Timestamp(_) => {
|
||||||
|
// FIXME(dennis): we always consider the timestamp unit is millis, it's not correct if user define timestamp column with other units.
|
||||||
|
obj.try_into_value::<i64>(vm)
|
||||||
|
.ok()
|
||||||
|
.map(Timestamp::from_millis)
|
||||||
|
.map(value::Value::Timestamp)
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if is_instance::<PyNone>(&obj, vm) {
|
} else if is_instance::<PyNone>(&obj, vm) {
|
||||||
// if Untyped then by default return types with highest precision
|
// if Untyped then by default return types with highest precision
|
||||||
@@ -906,9 +934,10 @@ pub fn val_to_pyobj(val: value::Value, vm: &VirtualMachine) -> PyObjectRef {
|
|||||||
value::Value::String(s) => vm.ctx.new_str(s.as_utf8()).into(),
|
value::Value::String(s) => vm.ctx.new_str(s.as_utf8()).into(),
|
||||||
// is this copy necessary?
|
// is this copy necessary?
|
||||||
value::Value::Binary(b) => vm.ctx.new_bytes(b.deref().to_vec()).into(),
|
value::Value::Binary(b) => vm.ctx.new_bytes(b.deref().to_vec()).into(),
|
||||||
// is `Date` and `DateTime` supported yet? For now just ad hoc into PyInt
|
// TODO(dennis):is `Date` and `DateTime` supported yet? For now just ad hoc into PyInt, but it's better to be cast into python Date, DateTime objects etc..
|
||||||
value::Value::Date(v) => vm.ctx.new_int(v.val()).into(),
|
value::Value::Date(v) => vm.ctx.new_int(v.val()).into(),
|
||||||
value::Value::DateTime(v) => vm.ctx.new_int(v.val()).into(),
|
value::Value::DateTime(v) => vm.ctx.new_int(v.val()).into(),
|
||||||
|
// FIXME(dennis): lose the timestamp unit here
|
||||||
Value::Timestamp(v) => vm.ctx.new_int(v.value()).into(),
|
Value::Timestamp(v) => vm.ctx.new_int(v.value()).into(),
|
||||||
value::Value::List(_) => unreachable!(),
|
value::Value::List(_) => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user