mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
feat: script engine and python impl (#219)
* feat: improve try_into_vector function * Impl python mod and PyVector to execute script * add AsSeq(BUT not IMPL) * add&test pythonic_index, add into_py_obj(UNTEST) * add into_datatypes_value(UNTEST) * inplace setitem_by_index unsupport * still struggle with testing AsSeq * actually pyimpl AsSeq&AsMap * add slice for PyVector * improve visualibility for testing * adjust for clippy * add assert for test_execute_script * add type anno in test * feat: basic support for PyVector's operator with scalar (#64) * feat: memory size of vector (#53) * feat: improve try_into_vector function * feat: impl memory_size function for vectors * fix: forgot memory_size assertion in null vector test * feat: use LargeUtf8 instead of utf8 for string, and rename LargeBianryArray to BinaryArray * feat: memory_size only calculates heap size * feat: impl bytes_allocated for memtable (#55) * add init and constr * rename type cast and add test * fix bug in pyobj_to_val * add default cast when no type specifed * add basic add/sub/mul for array and scalar(value) * cargo clippy * comment out some println * stricter clippy * style: cargo fmt * fix: string&bool support in val2pyobj & back * style: remove println in test * style: rm println in test mod in python.rs * refactor: use wrap_index instead of pythonic_index * refactor: right op in scalar_arith_op * fix: stronger type& better test * style: remove println * fix: scalar sign/unsigned cast * feat: improve try_into_vector function * Impl python mod and PyVector to execute script * add AsSeq(BUT not IMPL) * add&test pythonic_index, add into_py_obj(UNTEST) * add into_datatypes_value(UNTEST) * inplace setitem_by_index unsupport * still struggle with testing AsSeq * actually pyimpl AsSeq&AsMap * add slice for PyVector * improve visualibility for testing * adjust for clippy * add assert for test_execute_script * add type anno in test * add init and constr * rename type cast and add test * fix bug in pyobj_to_val * add default cast when no type specifed * add basic add/sub/mul for array and scalar(value) * cargo clippy * comment out some println * stricter clippy * style: cargo fmt * fix: string&bool support in val2pyobj & back * style: remove println in test * style: rm println in test mod in python.rs * refactor: use wrap_index instead of pythonic_index * refactor: right op in scalar_arith_op * fix: stronger type& better test * style: remove println * fix: scalar sign/unsigned cast * style: remove instead of comment out * style: remove more comment out * feat: support scalar div vector * style: cargo fmt * style: typo * refactor: rename to correct var name * refactor: directly use arrow2::array * refactor: mv rsub&rdiv's op into a function * test: add python expr test * test: add test for PyList * refactor: tweak order of arithmetics in rtruediv * style: remove some `use` * refactor: move `is_instance` to mod * refactor: move fn to mod& move `use` to head * style: cargo fmt * fix: correct signed/unsigned cast * refactor: wrap err msg in another fn * style: cargo fmt * style: remove ok_or_else for readability * feat: add coprocessor fn(not yet impl) * refactor: change back to wrapped_at * fix: update Cargo.lock * fix: update rustc version * Update Rust Toolchain to nightly-2022-07-14 * feat: derive Eq when possible * style: use `from` to avoid `needless_borrow` lint Co-authored-by: dennis zhuang <killme2008@gmail.com> * feat: python coprocessor with type annotation (#96) * feat: add coprocessor fn Signed-off-by: discord9 <zglzy29yzdk@gmail.com> * feat: cast args into PyVector * feat: uncomplete coprocessor * feat: erase decorator in python ast * feat: strip decorator in ast * fix: change parse to `Interactive` * style: format Cargo.toml * feat: make coprocessor actually work * feat: move coprocessor fn out of test mod * feat: add error handling * style: add some comment * feat: rm type annotation * feat: add type annotation support * style: move compile method to vm closure * feat: annotation for nullable * feat: type coercion cast in annotation * feat: actually cast(NOT TESTED) * fix: allow single into(type) * refactor: extract parse_type from parser * style: cargo fmt * feat: change to Expr to preserve location info * feat: add CoprParse to deal parse check error * style: add type anno doc for coprocessor * test: add some test * feat: add underscore as any type in annotation * test: add parse& runtime testcases * style: rm dbg! remnant * style: cargo fmt * feat: add more error prompt info * style: cargo fmt * style: add doc tests' missing `use` * fix: doc test for coprocessor * style: cargo fmt * fix: add missing `use` for `cargo test --doc` * refactor: according to reviews * refactor: more tweaks according to reviews * refactor: merge match arm * refactor: move into different files(UNCOMPLELTE) * refactor: split parse_copr into more function * refactor: split `exec_coprocessor` to more fn * style: cargo fmt * feat: print Py Exceptions in String * feat: error handling conform standards * test: fix test_coprocessor * feat: remove `into` in python * test: remove all `into` in python test * style: update comment * refactor: move strip compile fn to impl Copr * refactor: move `gen_schema` to impl copr * refactor: move `check_cast_type` to impl copr * refactor: if let to match * style: cargo fmt * refactor: better parse of keyword arg list * style: cargo fmt * refactor: some error handling(UNCOMPLETE) * refactor: error handling to general Error type * refactor: rm some Vec::new() * test: modify all tests to ok * style: reorder item * refactor: fetch using iter * style: cargo fmt * style: fmt macro by hand * refactor: rename InnerError to Error * test: use ron to write test * test: add test for exec_copr * refactor: add parse_bin_op * feat: add check_anno * refactor: add some checker function * refactor: exec_copr into smaller func * style: add some comment * refactor: add check for bin_op * refactor: rm useless Result * style: add pretty print for error with location * feat: more info for pretty print * refactor: mv pretty print to error.rs * refactor: rm execute_script * feat: add pretty print * feat: add constant column support * test: add test for constant column * feat: add pretty print exec fn * style: cargo fmt * feat: add macro to chain call `.fail()` * style: update doc for constant columns * style: add lint to allow print in test fn * style: cargo fmt * docs: update some comment * fix: ignore doctest for now * refactor: check_bin_op * refactor: parse_in_op, check ret anno fn * refactor: rm check_decorator * doc: loc add newline explain * style: cargo fmt * refactor: use Helper::try_into_vec in try_into_vec * style: cargo fmt * test: add ret anno test * style: cargo fmt * test: add name for .ron tests for better debug * test: print emoji in test * style: rm some comment out line * style: rename `into` to `try_into` fn * style: cargo fmt * refactor: rm unuse serialize derive * fix: pretty print out of bound fix * fix: rm some space in pretty print * style: cargo fmt * test: not even a python fn def * style: cargo fmt * fix: pretty print off by one space * fix: allow `eprint` in clippy lint * fix: compile error after rebase develop * feat: port 35 functions from DataFusion to Python Coprocessor (#137) * refactor: `cargo clippy` * feat: create a module * style: cargo fmt * feat: bind `pow()` function(UNTEST) * test: add test for udf mod * style: allow part eq not eq for gen code * style: allow print in test lint * feat: use PyObjectRef to handle more types * feat: add cargo feature for udf modules * style: rename feature to udf-builtins * refactor: move away from mod.rs * feat: add all_to_f64 cast fn * feat: add bind_math_fn macro * feat: add all simple math UDF * feat: add `random(len)` math fn * feat: port `avg()` from datafusion * refactor: add `eval_aggr_fn` * feat: add bind_aggr_fn macro * doc: add comment for args of macro * feat: add all UDAF from datafusion * refactor: extract test to separate file * style: cargo fmt * test: add incomplete test * test: add .ron test fn * feat: support scalar::list * doc: add comments * style: rename VagueFloat/Int to LenFloat/IntVec * test: for all fn(expect approx_median) * test: better print * doc: add comment for FloatWithError * refactor: move test.rs out of builtins/ * style: cargo fmt * doc: add comment for .ron file * doc: update some comments * test: EPS=1e-12 for float eq * test: use f64::EPSILON instead * test: change to 2*EPS * test: cache interpreter for fast testing * doc: remove a TODO which is done * test: refacto to_py_obj fn * fix: pow fn * doc: add a TODO for type_.rs * test: use new_int/float in test serde * test: for str case * style: cargo fmt * feat: cast PyList to ScalarValue::List * test: cast scalar to py obj and back * feat: cast to PyList * test: cast from PyList * test: nested PyVector unsupported * doc: remove unrunable doctest * test: replace PartialEq with impl just_as_expect * doc: add name for discord9's TODO * refactor: cahnge to vm.ctx.new_** instead * doc: complete a TODO * refactor: is_instance and other minor problem * refactor: remove type_::is_instance * style: cargo fmt * feat: rename to `greptime_builtin` * fix: error handling for PyList datatype * style: fix clippy warning * test: for PyList * feat: Python Coprocessor MVP (#180) * feat: add get_arrow_op * feat: add comparsion op(UNTESTED) * doc: explain why no rich compare * refactor: py_str2str&parse_keywords * feat: add DecoratorArgs * refactor: parse_keywords ret Deco Args * style: remove unused * doc: add todo * style: remove some unused fn * doc: add comment for copr's field * feat: add copr_engine module * refactor: move to `script` crate * style: clean up cargo.toml * feat: add query engine for copr engine * refactor: deco args into separate struct * test: update corrsponding test * feat: async coprocessor engine * refactor: add `exec_parsed` fn * feat: sync version of coprocessor(UNTEST) * refactor: remove useless lifetime * feat: new type for async stream record batch * merge: from PR#137 add py builtins * toolchain: update rustc to nightly-08-16 * feat: add `exec_with_cached_vm` fn(Can't compile) * toolchain: revert to 07-14 * fix: `exec_with_cached_vm` * fix: allow vector[_] in params * style: cargo fmt * doc: update comment on `_`&`_|None` * fix: allow import&ignore type anno is ok * feat: allow ignore return types * refsctor: remove unused py files in functions/ * style: fmt&clippy * refactor: python modules (#186) * refactor: move common/script to script * fix: clippy warnings and refactor python modules * refactor: remove modules mod rename tests mod * feat: adds Script and ScriptEngine trait, then impl PyScript/PyScriptEngine * refactor: remove pub use some functions in script * refactor: python error mod * refactor: coprocessor and vector * feat: adds engine test and greptime.vector function to create vector from iterable * fix: adds a blank line to cargo file end * fix: compile error after rebase develop * feat: script endpoint for http server (#206) * feat: impl /scripts API for http server * feat: adds http api version * test: add test for scripts handler and endpoint * feat: python side mock module and more builtin functions (#209) * feat: add python side module(for both mock and real upload script) * style: add *.pyc to gitignore * feat: move copr decorator(in .py) to greptime.py * doc: update comment for `datetime`&`mock_tester`&gitignore * feat: `filter()` a array with bool array(UNTESTED) * feat: `prev()`ious elem in array ret as new array(UNTEST) * feat: `datetime()` parse date time string and ret integer(UNTEST) * fix: add missing return&fmt * fix: allow f32 cast to PyFloat * fix: `datetime()`'s last token now parsed * test: `calc_rvs` now can run with builtin module * feat: allow rich compare which ret bool array * feat: logic and(`&`) for bool array * style: cargo fmt * feat: index PyVector by bool array * feat: alias `ln` as `log` in builtin modules * feat: logic or(`|`)¬( `~`) for bool array * feat: add `post` for @copr in py side mod * feat: change datetime return to i64 * feat: py side mod `post` script to given address * fix: add `engine` field in `post` in py side mod * refactor: use `ConstantVector` in `pow()` builtin * fix: prev ret err for zero array * doc: rm comment out code * test: incomplete pyside mod test case * git: ignore all __pycache__ * style: fmt&clippy * refactor: split py side module into exmaple&gptime * feat: init_table in py using `v1/sql` api * feat: calc_rvs now run both locally and remote * doc: add doc for how to run it * fix: comment out start server code in test * fix: clippy warnings * fix: http test url * fix: some CR problems * fix: some CR problems * refactor: script executor for instance * refactor: remove engine param in execute_script * chore: Remove unnecessary allow attributes Co-authored-by: Dennis Zhuang <killme2008@gmail.com> Co-authored-by: Discord9 <discord9@163.com> Co-authored-by: discord9 <zglzy29yzdk@gmail.com> Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
This commit is contained in:
4
component/script/python/greptime/__init__.py
Normal file
4
component/script/python/greptime/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .greptime import coprocessor, copr
|
||||
from .greptime import vector, log, prev, sqrt, pow, datetime, sum
|
||||
from .mock import mock_tester
|
||||
from .cfg import set_conn_addr, get_conn_addr
|
||||
11
component/script/python/greptime/cfg.py
Normal file
11
component/script/python/greptime/cfg.py
Normal file
@@ -0,0 +1,11 @@
|
||||
GREPTIME_DB_CONN_ADDRESS = "localhost:3000"
|
||||
"""The Global Variable for address for conntect to database"""
|
||||
|
||||
def set_conn_addr(addr: str):
|
||||
"""set database address to given `addr`"""
|
||||
global GREPTIME_DB_CONN_ADDRESS
|
||||
GREPTIME_DB_CONN_ADDRESS = addr
|
||||
|
||||
def get_conn_addr()->str:
|
||||
global GREPTIME_DB_CONN_ADDRESS
|
||||
return GREPTIME_DB_CONN_ADDRESS
|
||||
215
component/script/python/greptime/greptime.py
Normal file
215
component/script/python/greptime/greptime.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""
|
||||
Be note that this is a mock library, if not connected to database,
|
||||
it can only run on mock data and mock function which is supported by numpy
|
||||
"""
|
||||
import functools
|
||||
import numpy as np
|
||||
import json
|
||||
from urllib import request
|
||||
import inspect
|
||||
import requests
|
||||
|
||||
from .cfg import set_conn_addr, get_conn_addr
|
||||
|
||||
log = np.log
|
||||
sum = np.nansum
|
||||
sqrt = np.sqrt
|
||||
pow = np.power
|
||||
nan = np.nan
|
||||
|
||||
|
||||
class TimeStamp(str):
|
||||
"""
|
||||
TODO: impl date time
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class i32(int):
|
||||
"""
|
||||
For Python Coprocessor Type Annotation ONLY
|
||||
A signed 32-bit integer.
|
||||
"""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "i32"
|
||||
|
||||
|
||||
class i64(int):
|
||||
"""
|
||||
For Python Coprocessor Type Annotation ONLY
|
||||
A signed 64-bit integer.
|
||||
"""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "i64"
|
||||
|
||||
|
||||
class f32(float):
|
||||
"""
|
||||
For Python Coprocessor Type Annotation ONLY
|
||||
A 32-bit floating point number.
|
||||
"""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "f32"
|
||||
|
||||
|
||||
class f64(float):
|
||||
"""
|
||||
For Python Coprocessor Type Annotation ONLY
|
||||
A 64-bit floating point number.
|
||||
"""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "f64"
|
||||
|
||||
|
||||
class vector(np.ndarray):
|
||||
"""
|
||||
A compact Vector with all elements of same Data type.
|
||||
"""
|
||||
_datatype: str | None = None
|
||||
|
||||
def __new__(
|
||||
cls,
|
||||
lst,
|
||||
dtype=None
|
||||
) -> ...:
|
||||
self = np.asarray(lst).view(cls)
|
||||
self._datatype = dtype
|
||||
return self
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "vector({}, \"{}\")".format(super().__str__(), self.datatype())
|
||||
|
||||
def datatype(self):
|
||||
return self._datatype
|
||||
|
||||
def filter(self, lst_bool):
|
||||
return self[lst_bool]
|
||||
|
||||
|
||||
def prev(lst):
|
||||
ret = np.zeros(len(lst))
|
||||
ret[1:] = lst[0:-1]
|
||||
ret[0] = nan
|
||||
return ret
|
||||
|
||||
|
||||
def query(sql: str):
|
||||
pass
|
||||
|
||||
|
||||
def interval(arr: list, duration: int, fill, step: None | int = None, explicitOffset=False):
|
||||
"""
|
||||
Note that this is a mock function with same functionailty to the actual Python Coprocessor
|
||||
`arr` is a vector of integral or temporal type.
|
||||
|
||||
`duration` is the length of sliding window
|
||||
|
||||
`step` being the length when sliding window take a step
|
||||
|
||||
`fill` indicate how to fill missing value:
|
||||
- "prev": use previous
|
||||
- "post": next
|
||||
- "linear": linear interpolation, if not possible to interpolate certain types, fallback to prev
|
||||
- "null": use null
|
||||
- "none": do not interpolate
|
||||
"""
|
||||
if step is None:
|
||||
step = duration
|
||||
|
||||
tot_len = int(np.ceil(len(arr) // step))
|
||||
slices = np.zeros((tot_len, int(duration)))
|
||||
for idx, start in enumerate(range(0, len(arr), step)):
|
||||
slices[idx] = arr[start:(start + duration)]
|
||||
return slices
|
||||
|
||||
|
||||
def factor(unit: str) -> int:
|
||||
if unit == "d":
|
||||
return 24 * 60 * 60
|
||||
elif unit == "h":
|
||||
return 60 * 60
|
||||
elif unit == "m":
|
||||
return 60
|
||||
elif unit == "s":
|
||||
return 1
|
||||
else:
|
||||
raise Exception("Only d,h,m,s, found{}".format(unit))
|
||||
|
||||
|
||||
def datetime(input_time: str) -> int:
|
||||
"""
|
||||
support `d`(day) `h`(hour) `m`(minute) `s`(second)
|
||||
|
||||
support format:
|
||||
`12s` `7d` `12d2h7m`
|
||||
"""
|
||||
|
||||
prev = 0
|
||||
cur = 0
|
||||
state = "Num"
|
||||
parse_res = []
|
||||
for idx, ch in enumerate(input_time):
|
||||
if ch.isdigit():
|
||||
cur = idx
|
||||
|
||||
if state != "Num":
|
||||
parse_res.append((state, input_time[prev:cur], (prev, cur)))
|
||||
prev = idx
|
||||
state = "Num"
|
||||
else:
|
||||
cur = idx
|
||||
if state != "Symbol":
|
||||
parse_res.append((state, input_time[prev:cur], (prev, cur)))
|
||||
prev = idx
|
||||
state = "Symbol"
|
||||
parse_res.append((state, input_time[prev:cur+1], (prev, cur+1)))
|
||||
|
||||
cur_idx = 0
|
||||
res_time = 0
|
||||
while cur_idx < len(parse_res):
|
||||
pair = parse_res[cur_idx]
|
||||
if pair[0] == "Num":
|
||||
val = int(pair[1])
|
||||
nxt = parse_res[cur_idx+1]
|
||||
res_time += val * factor(nxt[1])
|
||||
cur_idx += 2
|
||||
else:
|
||||
raise Exception("Two symbol in a row is impossible")
|
||||
|
||||
return res_time
|
||||
|
||||
|
||||
def coprocessor(args=None, returns=None, sql=None):
|
||||
"""
|
||||
The actual coprocessor, which will connect to database and update
|
||||
whatever function decorated with `@coprocessor(args=[...], returns=[...], sql=...)`
|
||||
"""
|
||||
def decorator_copr(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper_do_actual(*args, **kwargs):
|
||||
if len(args)!=0 or len(kwargs)!=0:
|
||||
raise Exception("Expect call with no arguements(for all args are given by coprocessor itself)")
|
||||
source = inspect.getsource(func)
|
||||
url = "http://{}/v1/scripts".format(get_conn_addr())
|
||||
print("Posting to {}".format(url))
|
||||
data = {
|
||||
"script": source,
|
||||
"engine": None,
|
||||
}
|
||||
|
||||
res = requests.post(
|
||||
url,
|
||||
headers={"Content-Type": "application/json"},
|
||||
json=data
|
||||
)
|
||||
return res
|
||||
return wrapper_do_actual
|
||||
return decorator_copr
|
||||
|
||||
|
||||
# make a alias for short
|
||||
copr = coprocessor
|
||||
82
component/script/python/greptime/mock.py
Normal file
82
component/script/python/greptime/mock.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""
|
||||
Note this is a mock library, if not connected to database,
|
||||
it can only run on mock data and support by numpy
|
||||
"""
|
||||
from typing import Any
|
||||
import numpy as np
|
||||
from .greptime import i32,i64,f32,f64, vector, interval, query, prev, datetime, log, sum, sqrt, pow, nan, copr, coprocessor
|
||||
|
||||
import inspect
|
||||
import functools
|
||||
import ast
|
||||
|
||||
|
||||
|
||||
def mock_tester(
|
||||
func,
|
||||
env:dict,
|
||||
table=None
|
||||
):
|
||||
"""
|
||||
Mock tester helper function,
|
||||
What it does is replace `@coprocessor` with `@mock_cpor` and add a keyword `env=env`
|
||||
like `@mock_copr(args=...,returns=...,env=env)`
|
||||
"""
|
||||
code = inspect.getsource(func)
|
||||
tree = ast.parse(code)
|
||||
tree = HackyReplaceDecorator("env").visit(tree)
|
||||
new_func = tree.body[0]
|
||||
fn_name = new_func.name
|
||||
|
||||
code_obj = compile(tree, "<embedded>", "exec")
|
||||
exec(code_obj)
|
||||
|
||||
ret = eval("{}()".format(fn_name))
|
||||
return ret
|
||||
|
||||
def mock_copr(args, returns, sql=None, env:None|dict=None):
|
||||
"""
|
||||
This should not be used directly by user
|
||||
"""
|
||||
def decorator_copr(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper_do_actual(*fn_args, **fn_kwargs):
|
||||
|
||||
real_args = [env[name] for name in args]
|
||||
ret = func(*real_args)
|
||||
return ret
|
||||
|
||||
return wrapper_do_actual
|
||||
return decorator_copr
|
||||
|
||||
class HackyReplaceDecorator(ast.NodeTransformer):
|
||||
"""
|
||||
This class accept a `env` dict for environment to extract args from,
|
||||
and put `env` dict in the param list of `mock_copr` decorator, i.e:
|
||||
|
||||
a `@copr(args=["a", "b"], returns=["c"])` with call like mock_helper(abc, env={"a":2, "b":3})
|
||||
|
||||
will be transform into `@mock_copr(args=["a", "b"], returns=["c"], env={"a":2, "b":3})`
|
||||
"""
|
||||
def __init__(self, env: str) -> None:
|
||||
# just for add `env` keyword
|
||||
self.env = env
|
||||
|
||||
def visit_FunctionDef(self, node: ast.FunctionDef) -> Any:
|
||||
new_node = node
|
||||
decorator_list = new_node.decorator_list
|
||||
if len(decorator_list)!=1:
|
||||
return node
|
||||
|
||||
deco = decorator_list[0]
|
||||
if deco.func.id!="coprocessor" and deco.func.id !="copr":
|
||||
raise Exception("Expect a @copr or @coprocessor, found {}.".format(deco.func.id))
|
||||
deco.func = ast.Name(id="mock_copr", ctx=ast.Load())
|
||||
new_kw = ast.keyword(arg="env", value=ast.Name(id=self.env, ctx=ast.Load()))
|
||||
deco.keywords.append(new_kw)
|
||||
|
||||
# Tie up loose ends in the AST.
|
||||
ast.copy_location(new_node, node)
|
||||
ast.fix_missing_locations(new_node)
|
||||
self.generic_visit(node)
|
||||
return new_node
|
||||
Reference in New Issue
Block a user