Files
greptimedb/component/script/python/example/calc_rv.py
discord9 20dcaa6897 feat: interval& None value for prev&`next (#252)
* test: for builtin functions

* test: expect fail for `datetime()`

* feat: add `interval()` fn(WIP)

* feat: `interval()` fn in builtin(UNTEST)

* refactor: move `py_vec_obj_to_array` to util.rs

* style: fmt

* test: simple `interval()` cases

* test: `interval()` with `last()`&`first()`

* doc: `ts` param of `interval()`

* log: common_telemetry for logging in script crate

* doc: corrsponding test fn for each .ron file

* feat: change to`mpsc` for schedule_job

* test: schedule_job

* dep: rm rustpython dep in common-function

* refactor: mv `schedule_job` into `Script` trait

* test: change to use `interval` to sample datapoint

* feat: add gen_none_array for generate None Array

* feat: impl Missing value for `prev`&`next`

* test: `sum(prev(values))`

* doc: add comment for why not support Float16 in `prev()`

* feat: add `interval` in py side mock module

* style: cargo fmt

* refactor: according to comments

* refactor: extract `apply_interval_function`

* style: cargo fmt

* refactor: remove `schedule()`

* style: cargo fmt
2022-09-14 10:48:27 +08:00

72 lines
2.4 KiB
Python

import sys
# for annoying releative import beyond top-level package
sys.path.insert(0, "../")
from greptime import mock_tester, coprocessor, greptime as gt_builtin
from greptime.greptime import interval, vector, log, prev, sqrt, datetime
import greptime.greptime as greptime
import json
import numpy as np
def data_sample(k_lines, symbol, density=5 * 30 * 86400):
"""
Only return close data for simplicty for now
"""
k_lines = k_lines["result"] if k_lines["ret_msg"] == "OK" else None
if k_lines is None:
raise Exception("Expect a `OK`ed message")
close = [float(i["close"]) for i in k_lines]
return interval(close, density, "prev")
def as_table(kline: list):
col_len = len(kline)
ret = {
k: vector([fn(row[k]) for row in kline], str(ty))
for k, fn, ty in
[
("symbol", str, "str"),
("period", str, "str"),
("open_time", int, "int"),
("open", float, "float"),
("high", float, "float"),
("low", float, "float"),
("close", float, "float")
]
}
return ret
@coprocessor(args=["open_time", "close"], returns=[
"rv_7d",
"rv_15d",
"rv_30d",
"rv_60d",
"rv_90d",
"rv_180d"
])
def calc_rvs(open_time, close):
from greptime import vector, log, prev, sqrt, datetime, pow, sum, last
import greptime as g
def calc_rv(close, open_time, time, interval):
mask = (open_time < time) & (open_time > time - interval)
close = close[mask]
open_time = open_time[mask]
close = g.interval(open_time, close, datetime("10m"), lambda x:last(x))
avg_time_interval = (open_time[-1] - open_time[0])/(len(open_time)-1)
ref = log(close/prev(close))
var = sum(pow(ref, 2)/(len(ref)-1))
return sqrt(var/avg_time_interval)
# how to get env var,
# maybe through accessing scope and serde then send to remote?
timepoint = open_time[-1]
rv_7d = vector([calc_rv(close, open_time, timepoint, datetime("7d"))])
rv_15d = vector([calc_rv(close, open_time, timepoint, datetime("15d"))])
rv_30d = vector([calc_rv(close, open_time, timepoint, datetime("30d"))])
rv_60d = vector([calc_rv(close, open_time, timepoint, datetime("60d"))])
rv_90d = vector([calc_rv(close, open_time, timepoint, datetime("90d"))])
rv_180d = vector([calc_rv(close, open_time, timepoint, datetime("180d"))])
return rv_7d, rv_15d, rv_30d, rv_60d, rv_90d, rv_180d