feat: flow perf&fix df func call (#4347)

* feat: flow perf&fix df func call

feat: make source sender `send` non-blocking

feat: better control of flow worker freq

feat: support transform interval

fix: const folding df func args&tests

tests: update cast const fold

chore: adjust flow work's freq

refactor: batch split

feat: adaptive run freq flow worker&check for errors

chore: better debug log

* refactor: per review

* chore: per zc's review

* chore: per bot review

* chore: remove some `TODO` completed

* docs: add comments for a test
This commit is contained in:
discord9
2024-07-15 17:20:04 +08:00
committed by GitHub
parent 04ac0c8da0
commit 2b912d93fb
14 changed files with 632 additions and 166 deletions

View File

@@ -18,7 +18,7 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use std::time::{Duration, Instant, SystemTime};
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use common_config::Configurable;
@@ -51,7 +51,7 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::repr::{self, DiffRow, Row};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
use crate::transform::sql_to_flow_plan;
mod flownode_impl;
@@ -67,7 +67,7 @@ mod table_source;
use crate::error::Error;
use crate::FrontendInvoker;
// TODO(discord9): replace this with `GREPTIME_TIMESTAMP` before v0.9
// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
pub const UPDATE_AT_TS_COL: &str = "update_at";
@@ -212,8 +212,6 @@ pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
/// This impl block contains methods to send writeback requests to frontend
impl FlowWorkerManager {
/// TODO(discord9): merge all same type of diff row into one requests
///
/// Return the number of requests it made
pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
let all_reqs = self.generate_writeback_request().await;
@@ -464,7 +462,6 @@ impl FlowWorkerManager {
shutdown: Option<broadcast::Receiver<()>>,
) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
// TODO(discord9): add heartbeat tasks here
common_runtime::spawn_bg(async move {
self.run(shutdown).await;
})
@@ -484,21 +481,31 @@ impl FlowWorkerManager {
}
}
async fn get_buf_size(&self) -> usize {
self.node_context.read().await.get_send_buf_size().await
}
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
debug!("Starting to run");
let default_interval = Duration::from_secs(1);
let mut avg_spd = 0; // rows/sec
let mut since_last_run = tokio::time::Instant::now();
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
if let Err(err) = self.run_available(true).await {
let row_cnt = self.run_available(true).await.unwrap_or_else(|err| {
common_telemetry::error!(err;"Run available errors");
}
// TODO(discord9): error handling
0
});
if let Err(err) = self.send_writeback_requests().await {
common_telemetry::error!(err;"Send writeback request errors");
};
self.log_all_errors().await;
// determine if need to shutdown
match &shutdown.as_mut().map(|s| s.try_recv()) {
Some(Ok(())) => {
info!("Shutdown flow's main loop");
@@ -515,7 +522,25 @@ impl FlowWorkerManager {
}
None => (),
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
// before trigger a run of flow's worker
// (plus one for prevent div by zero)
let wait_for = since_last_run.elapsed();
let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
// rapid increase, slow decay
avg_spd = if cur_spd > avg_spd {
cur_spd
} else {
(9 * avg_spd + cur_spd) / 10
};
debug!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
debug!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
}
// flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
// FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
@@ -528,8 +553,10 @@ impl FlowWorkerManager {
///
/// set `blocking` to true to wait until lock is acquired
/// and false to return immediately if lock is not acquired
/// return numbers of rows send to worker
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self, blocking: bool) -> Result<(), Error> {
pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
@@ -539,35 +566,33 @@ impl FlowWorkerManager {
} else if let Ok(worker) = worker.try_lock() {
worker.run_available(now).await?;
} else {
return Ok(());
return Ok(row_cnt);
}
}
// first check how many inputs were sent
// check row send and rows remain in send buf
let (flush_res, buf_len) = if blocking {
let ctx = self.node_context.read().await;
(ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
} else {
match self.node_context.try_read() {
Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await),
Err(_) => return Ok(()),
Err(_) => return Ok(row_cnt),
}
};
match flush_res {
Ok(_) => (),
Ok(r) => row_cnt += r,
Err(err) => {
common_telemetry::error!("Flush send buf errors: {:?}", err);
break;
}
};
// if no thing in send buf then break
if buf_len == 0 {
// if not enough rows, break
if buf_len < BATCH_SIZE {
break;
} else {
debug!("Send buf len = {}", buf_len);
}
}
Ok(())
Ok(row_cnt)
}
/// send write request to related source sender
@@ -583,8 +608,6 @@ impl FlowWorkerManager {
);
let table_id = region_id.table_id();
self.node_context.read().await.send(table_id, rows).await?;
// TODO(discord9): put it in a background task?
// self.run_available(false).await?;
Ok(())
}
}

View File

@@ -14,7 +14,7 @@
//! Node context, prone to change with every incoming requests
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use common_telemetry::debug;
@@ -65,54 +65,64 @@ pub struct FlownodeContext {
/// backpressure and adjust dataflow running duration to avoid blocking
#[derive(Debug)]
pub struct SourceSender {
// TODO(discord9): make it all Vec<DiffRow>?
sender: broadcast::Sender<DiffRow>,
send_buf: RwLock<VecDeque<DiffRow>>,
send_buf_tx: mpsc::UnboundedSender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::UnboundedReceiver<Vec<DiffRow>>>,
}
impl Default for SourceSender {
fn default() -> Self {
let (send_buf_tx, send_buf_rx) = mpsc::unbounded_channel();
Self {
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
send_buf: Default::default(),
send_buf_tx,
send_buf_rx: RwLock::new(send_buf_rx),
}
}
}
impl SourceSender {
/// max number of iterations to try flush send buf
const MAX_ITERATIONS: usize = 16;
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
}
/// send as many as possible rows from send buf
/// until send buf is empty or broadchannel is full
pub async fn try_send_all(&self) -> Result<usize, Error> {
pub async fn try_flush(&self) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
let mut send_buf = self.send_buf.write().await;
let mut iterations = 0;
while iterations < Self::MAX_ITERATIONS {
let mut send_buf = self.send_buf_rx.write().await;
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() {
break;
}
if let Some(row) = send_buf.pop_front() {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
if let Some(rows) = send_buf.recv().await {
for row in rows {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
}
}
iterations += 1;
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
debug!(
"Remaining Send buf.len() = {}",
self.send_buf.read().await.len()
self.send_buf_rx.read().await.len()
);
}
@@ -121,11 +131,14 @@ impl SourceSender {
/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.write().await.extend(rows);
self.send_buf_tx.send(rows).map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
}
.build()
})?;
let row_cnt = self.try_send_all().await?;
Ok(row_cnt)
Ok(0)
}
}
@@ -150,7 +163,7 @@ impl FlownodeContext {
pub async fn flush_all_sender(&self) -> Result<usize, Error> {
let mut sum = 0;
for sender in self.source_sender.values() {
sender.try_send_all().await.inspect(|x| sum += x)?;
sender.try_flush().await.inspect(|x| sum += x)?;
}
Ok(sum)
}
@@ -159,7 +172,7 @@ impl FlownodeContext {
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
sum += sender.send_buf.read().await.len();
sum += sender.send_buf_rx.read().await.len();
}
sum
}

View File

@@ -146,6 +146,16 @@ fn mfp_subgraph(
// find all updates that need to be send from arrangement
let output_kv = arrange.read().get_updates_in_range(range);
err_collector.run(|| {
snafu::ensure!(
mfp_plan.is_temporal() || output_kv.is_empty(),
crate::expr::error::InternalSnafu {
reason: "Output from future should be empty since temporal filter is not applied"
}
);
Ok(())
});
// the output is expected to be key -> empty val
let output = output_kv
.into_iter()

View File

@@ -187,6 +187,39 @@ fn split_row_to_key_val(
}
}
/// split a row into key and val by evaluate the key and val plan
fn batch_split_rows_to_key_val(
rows: impl IntoIterator<Item = DiffRow>,
key_val_plan: KeyValPlan,
err_collector: ErrCollector,
) -> impl IntoIterator<Item = KeyValDiffRow> {
let mut row_buf = Row::new(vec![]);
rows.into_iter().filter_map(
move |(mut row, sys_time, diff): DiffRow| -> Option<KeyValDiffRow> {
err_collector.run(|| {
let len = row.len();
if let Some(key) = key_val_plan
.key_plan
.evaluate_into(&mut row.inner, &mut row_buf)?
{
// reuse the row as buffer
row.inner.resize(len, Value::Null);
// val_plan is not supported to carry any filter predicate,
let val = key_val_plan
.val_plan
.evaluate_into(&mut row.inner, &mut row_buf)?
.context(InternalSnafu {
reason: "val_plan should not contain any filter predicate",
})?;
Ok(Some(((key, val), sys_time, diff)))
} else {
Ok(None)
}
})?
},
)
}
/// reduce subgraph, reduce the input data into a single row
/// output is concat from key and val
fn reduce_subgraph(
@@ -204,13 +237,7 @@ fn reduce_subgraph(
send,
}: SubgraphArg,
) {
let mut row_buf = Row::empty();
let key_val = data.into_iter().filter_map(|(row, sys_time, diff)| {
// error is collected and then the row is skipped
err_collector
.run(|| split_row_to_key_val(row, sys_time, diff, key_val_plan, &mut row_buf))
.flatten()
});
let key_val = batch_split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
// from here for distinct reduce and accum reduce, things are drastically different
// for distinct reduce the arrange store the output,
// but for accum reduce the arrange store the accum state, and output is

View File

@@ -96,12 +96,8 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
let all = prev_avail.chain(to_send).collect_vec();
if !all.is_empty() || !to_arrange.is_empty() {
debug!(
"Rendered Source All send: {} rows, not yet send: {} rows",
all.len(),
to_arrange.len()
);
if !to_arrange.is_empty() {
debug!("Source Operator buffered {} rows", to_arrange.len());
}
err_collector.run(|| arranged.apply_updates(now, to_arrange));
send.give(all);

View File

@@ -587,6 +587,10 @@ pub struct MfpPlan {
}
impl MfpPlan {
/// Indicates if the `MfpPlan` contains temporal predicates. That is have outputs that may occur in future.
pub fn is_temporal(&self) -> bool {
!self.lower_bounds.is_empty() || !self.upper_bounds.is_empty()
}
/// find `now` in `predicates` and put them into lower/upper temporal bounds for temporal filter to use
pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, Error> {
let mut lower_bounds = Vec::new();

View File

@@ -56,6 +56,8 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this
pub const BROADCAST_CAP: usize = 65535;
pub const BATCH_SIZE: usize = BROADCAST_CAP / 2;
/// Convert a value that is or can be converted to Datetime to internal timestamp
///
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`

View File

@@ -16,6 +16,7 @@
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion_physical_expr::PhysicalExpr;
use datatypes::data_type::ConcreteDataType as CDT;
@@ -27,20 +28,23 @@ use substrait_proto::proto::function_argument::ArgType;
use substrait_proto::proto::Expression;
use crate::error::{
DatafusionSnafu, DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu,
PlanSnafu,
DatafusionSnafu, DatatypesSnafu, Error, EvalSnafu, ExternalSnafu, InvalidQuerySnafu,
NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
};
use crate::expr::{
BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc,
UnmaterializableFunc, VariadicFunc,
};
use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::transform::literal::{from_substrait_literal, from_substrait_type};
use crate::transform::literal::{
from_substrait_literal, from_substrait_type, to_substrait_literal,
};
use crate::transform::{substrait_proto, FunctionExtensions};
// TODO(discord9): found proper place for this
// TODO(discord9): refactor plan to substrait convert of `arrow_cast` function thus remove this function
/// ref to `arrow_schema::datatype` for type name
fn typename_to_cdt(name: &str) -> CDT {
match name {
fn typename_to_cdt(name: &str) -> Result<CDT, Error> {
let ret = match name {
"Int8" => CDT::int8_datatype(),
"Int16" => CDT::int16_datatype(),
"Int32" => CDT::int32_datatype(),
@@ -53,10 +57,22 @@ fn typename_to_cdt(name: &str) -> CDT {
"Float64" => CDT::float64_datatype(),
"Boolean" => CDT::boolean_datatype(),
"String" => CDT::string_datatype(),
"Date" => CDT::date_datatype(),
"Date" | "Date32" | "Date64" => CDT::date_datatype(),
"Timestamp" => CDT::timestamp_second_datatype(),
_ => CDT::null_datatype(),
}
"Timestamp(Second, None)" => CDT::timestamp_second_datatype(),
"Timestamp(Millisecond, None)" => CDT::timestamp_millisecond_datatype(),
"Timestamp(Microsecond, None)" => CDT::timestamp_microsecond_datatype(),
"Timestamp(Nanosecond, None)" => CDT::timestamp_nanosecond_datatype(),
"Time32(Second)" | "Time64(Second)" => CDT::time_second_datatype(),
"Time32(Millisecond)" | "Time64(Millisecond)" => CDT::time_millisecond_datatype(),
"Time32(Microsecond)" | "Time64(Microsecond)" => CDT::time_microsecond_datatype(),
"Time32(Nanosecond)" | "Time64(Nanosecond)" => CDT::time_nanosecond_datatype(),
_ => NotImplementedSnafu {
reason: format!("Unrecognized typename: {}", name),
}
.fail()?,
};
Ok(ret)
}
/// Convert [`ScalarFunction`] to corresponding Datafusion's [`PhysicalExpr`]
@@ -138,29 +154,72 @@ fn is_proto_literal(arg: &substrait_proto::proto::FunctionArgument) -> bool {
)
}
fn build_proto_lit(
lit: substrait_proto::proto::expression::Literal,
) -> substrait_proto::proto::FunctionArgument {
use substrait_proto::proto;
proto::FunctionArgument {
arg_type: Some(ArgType::Value(Expression {
rex_type: Some(proto::expression::RexType::Literal(lit)),
})),
}
}
/// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion
///
/// specially, if a argument is a literal, the replacement will not happen
fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction {
fn rewrite_scalar_function(
f: &ScalarFunction,
arg_typed_exprs: &[TypedExpr],
) -> Result<ScalarFunction, Error> {
let mut f_rewrite = f.clone();
for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() {
if !is_proto_literal(raw_expr) {
*raw_expr = proto_col(idx)
// only replace it with col(idx) if it is not literal
// will try best to determine if it is literal, i.e. for function like `cast(<literal>)` will try
// in both world to understand if it results in a literal
match (
is_proto_literal(raw_expr),
arg_typed_exprs[idx].expr.is_literal(),
) {
(false, false) => *raw_expr = proto_col(idx),
(true, _) => (),
(false, true) => {
if let ScalarExpr::Literal(val, ty) = &arg_typed_exprs[idx].expr {
let df_val = val
.try_to_scalar_value(ty)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let lit_sub = to_substrait_literal(&df_val)?;
// put const-folded literal back to df to simplify stuff
*raw_expr = build_proto_lit(lit_sub);
} else {
UnexpectedSnafu {
reason: format!(
"Expect value to be literal, but found {:?}",
arg_typed_exprs[idx].expr
),
}
.fail()?
}
}
}
}
f_rewrite
Ok(f_rewrite)
}
impl TypedExpr {
pub async fn from_substrait_to_datafusion_scalar_func(
f: &ScalarFunction,
arg_exprs_typed: Vec<TypedExpr>,
arg_typed_exprs: Vec<TypedExpr>,
extensions: &FunctionExtensions,
) -> Result<TypedExpr, Error> {
let (arg_exprs, arg_types): (Vec<_>, Vec<_>) =
arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip();
let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = arg_typed_exprs
.clone()
.into_iter()
.map(|e| (e.expr, e.typ))
.unzip();
debug!("Before rewrite: {:?}", f);
let f_rewrite = rewrite_scalar_function(f);
let f_rewrite = rewrite_scalar_function(f, &arg_typed_exprs)?;
debug!("After rewrite: {:?}", f_rewrite);
let input_schema = RelationType::new(arg_types).into_unnamed();
let raw_fn =
@@ -240,12 +299,21 @@ impl TypedExpr {
.with_context(|| InvalidQuerySnafu {
reason: "array_cast's second argument must be a literal string",
})?;
let cast_to = typename_to_cdt(&cast_to);
let func = UnaryFunc::Cast(cast_to);
let cast_to = typename_to_cdt(&cast_to)?;
let func = UnaryFunc::Cast(cast_to.clone());
let arg = arg_exprs[0].clone();
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
// constant folding here since some datafusion function require it for constant arg(i.e. `DATE_BIN`)
if arg.is_literal() {
let res = func.eval(&[], &arg).context(EvalSnafu)?;
Ok(TypedExpr::new(
ScalarExpr::Literal(res, cast_to.clone()),
ColumnType::new_nullable(cast_to),
))
} else {
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
Ok(TypedExpr::new(arg.call_unary(func), ret_type))
Ok(TypedExpr::new(arg.call_unary(func), ret_type))
}
}
2 if BinaryFunc::is_valid_func_name(fn_name) => {
let (func, signature) =
@@ -602,28 +670,9 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Literal(
Value::Int64(1),
CDT::int64_datatype(),
)
.call_unary(UnaryFunc::Cast(CDT::int16_datatype()))])
.unwrap()
.project(vec![1])
.unwrap(),
plan: Plan::Constant {
// cast of literal is constant folded
rows: vec![(repr::Row::new(vec![Value::from(1i16)]), i64::MIN, 1)],
},
};
assert_eq!(flow_plan.unwrap(), expected);

View File

@@ -12,23 +12,93 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::array::TryFromSliceError;
use bytes::Bytes;
use common_decimal::Decimal128;
use common_time::{Date, Timestamp};
use datafusion_common::ScalarValue;
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::value::Value;
use num_traits::FromBytes;
use snafu::ensure;
use substrait::substrait_proto_df::proto::expression::literal::user_defined::Val;
use substrait::substrait_proto_df::proto::expression::literal::UserDefined;
use substrait::variation_const::{
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_DAY_TIME_TYPE_URL, INTERVAL_MONTH_DAY_NANO_TYPE_REF,
INTERVAL_MONTH_DAY_NANO_TYPE_URL, INTERVAL_YEAR_MONTH_TYPE_REF, INTERVAL_YEAR_MONTH_TYPE_URL,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
use substrait_proto::proto::expression::literal::LiteralType;
use substrait_proto::proto::expression::Literal;
use substrait_proto::proto::r#type::Kind;
use substrait_proto::proto::r#type::{self, parameter, Kind, Parameter};
use substrait_proto::proto::Type;
use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
use crate::error::{Error, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
use crate::transform::substrait_proto;
/// TODO(discord9): this is copy from datafusion-substrait since the original function is not public, will be replace once is exported
pub(crate) fn to_substrait_literal(value: &ScalarValue) -> Result<Literal, Error> {
if value.is_null() {
return not_impl_err!("Unsupported literal: {value:?}");
}
let (literal_type, type_variation_reference) = match value {
ScalarValue::Boolean(Some(b)) => (LiteralType::Boolean(*b), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::Int8(Some(n)) => (LiteralType::I8(*n as i32), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::UInt8(Some(n)) => (
LiteralType::I8(*n as i32),
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
),
ScalarValue::Int16(Some(n)) => (LiteralType::I16(*n as i32), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::UInt16(Some(n)) => (
LiteralType::I16(*n as i32),
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
),
ScalarValue::Int32(Some(n)) => (LiteralType::I32(*n), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::UInt32(Some(n)) => (
LiteralType::I32(*n as i32),
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
),
ScalarValue::Int64(Some(n)) => (LiteralType::I64(*n), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::UInt64(Some(n)) => (
LiteralType::I64(*n as i64),
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
),
ScalarValue::Float32(Some(f)) => (LiteralType::Fp32(*f), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::Float64(Some(f)) => (LiteralType::Fp64(*f), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::TimestampSecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_SECOND_TYPE_VARIATION_REF,
),
ScalarValue::TimestampMillisecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_MILLI_TYPE_VARIATION_REF,
),
ScalarValue::TimestampMicrosecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_MICRO_TYPE_VARIATION_REF,
),
ScalarValue::TimestampNanosecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_NANO_TYPE_VARIATION_REF,
),
ScalarValue::Date32(Some(d)) => (LiteralType::Date(*d), DATE_32_TYPE_VARIATION_REF),
_ => (
not_impl_err!("Unsupported literal: {value:?}")?,
DEFAULT_TYPE_VARIATION_REF,
),
};
Ok(Literal {
nullable: false,
type_variation_reference,
literal_type: Some(literal_type),
})
}
/// Convert a Substrait literal into a Value and its ConcreteDataType (So that we can know type even if the value is null)
pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Error> {
let scalar_value = match &lit.literal_type {
@@ -105,11 +175,122 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro
)
}
Some(LiteralType::Null(ntype)) => (Value::Null, from_substrait_type(ntype)?),
_ => not_impl_err!("unsupported literal_type")?,
Some(LiteralType::IntervalDayToSecond(interval)) => {
let (days, seconds, microseconds) =
(interval.days, interval.seconds, interval.microseconds);
let millis = microseconds / 1000 + seconds * 1000;
let value_interval = common_time::Interval::from_day_time(days, millis);
(
Value::Interval(value_interval),
CDT::interval_day_time_datatype(),
)
}
Some(LiteralType::IntervalYearToMonth(interval)) => (
Value::Interval(common_time::Interval::from_year_month(
interval.years * 12 + interval.months,
)),
CDT::interval_year_month_datatype(),
),
Some(LiteralType::UserDefined(user_defined)) => {
from_substrait_user_defined_type(user_defined)?
}
_ => not_impl_err!("unsupported literal_type: {:?}", &lit.literal_type)?,
};
Ok(scalar_value)
}
fn from_bytes<T: FromBytes>(i: &Bytes) -> Result<T, Error>
where
for<'a> &'a <T as num_traits::FromBytes>::Bytes:
std::convert::TryFrom<&'a [u8], Error = TryFromSliceError>,
{
let (int_bytes, _rest) = i.split_at(std::mem::size_of::<T>());
let i = T::from_le_bytes(int_bytes.try_into().map_err(|e| {
UnexpectedSnafu {
reason: format!(
"Expect slice to be {} bytes, found {} bytes, error={:?}",
std::mem::size_of::<T>(),
int_bytes.len(),
e
),
}
.build()
})?);
Ok(i)
}
fn from_substrait_user_defined_type(user_defined: &UserDefined) -> Result<(Value, CDT), Error> {
if let UserDefined {
type_reference,
type_parameters: _,
val: Some(Val::Value(val)),
} = user_defined
{
// see https://github.com/apache/datafusion/blob/146b679aa19c7749cc73d0c27440419d6498142b/datafusion/substrait/src/logical_plan/producer.rs#L1957
// for interval type's transform to substrait
let ret = match *type_reference {
INTERVAL_YEAR_MONTH_TYPE_REF => {
ensure!(
val.type_url == INTERVAL_YEAR_MONTH_TYPE_URL,
UnexpectedSnafu {
reason: format!(
"Expect {}, found {} in type_url",
INTERVAL_YEAR_MONTH_TYPE_URL, val.type_url
)
}
);
let i: i32 = from_bytes(&val.value)?;
let value_interval = common_time::Interval::from_year_month(i);
(
Value::Interval(value_interval),
CDT::interval_year_month_datatype(),
)
}
INTERVAL_MONTH_DAY_NANO_TYPE_REF => {
ensure!(
val.type_url == INTERVAL_MONTH_DAY_NANO_TYPE_URL,
UnexpectedSnafu {
reason: format!(
"Expect {}, found {} in type_url",
INTERVAL_MONTH_DAY_NANO_TYPE_URL, val.type_url
)
}
);
let i: i128 = from_bytes(&val.value)?;
let (months, days, nsecs) = ((i >> 96) as i32, (i >> 64) as i32, i as i64);
let value_interval =
common_time::Interval::from_month_day_nano(months, days, nsecs);
(
Value::Interval(value_interval),
CDT::interval_month_day_nano_datatype(),
)
}
INTERVAL_DAY_TIME_TYPE_REF => {
ensure!(
val.type_url == INTERVAL_DAY_TIME_TYPE_URL,
UnexpectedSnafu {
reason: format!(
"Expect {}, found {} in type_url",
INTERVAL_DAY_TIME_TYPE_URL, val.type_url
)
}
);
let i: i64 = from_bytes(&val.value)?;
let (days, millis) = ((i >> 32) as i32, i as i32);
let value_interval = common_time::Interval::from_day_time(days, millis);
(
Value::Interval(value_interval),
CDT::interval_day_time_datatype(),
)
}
_ => return not_impl_err!("unsupported user defined type: {:?}", user_defined)?,
};
Ok(ret)
} else {
not_impl_err!("Expect val to be Some(...)")
}
}
/// convert a Substrait type into a ConcreteDataType
pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<CDT, Error> {
if let Some(kind) = &null_type.kind {

View File

@@ -358,7 +358,6 @@ impl Inserter {
// already know this is not source table
Some(None) => continue,
_ => {
// TODO(discord9): query metasrv for actual peer address
let peers = self
.table_flownode_set_cache
.get(table_id)

View File

@@ -59,6 +59,7 @@ DROP TABLE out_num_cnt;
Affected Rows: 0
-- test interprete interval
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
@@ -68,55 +69,35 @@ CREATE TABLE numbers_input (
Affected Rows: 0
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts);
create table out_num_cnt (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
Affected Rows: 0
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
+---------------------+-------+
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
| 2021-07-01T00:00:01 | 47 |
+---------------------+-------+
DROP FLOW test_numbers;
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input where number > 10;
Affected Rows: 0
DROP TABLE numbers_input;
SHOW CREATE FLOW filter_numbers;
+----------------+----------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------+----------------------------------------------------------------------------------------------------------------------------------------+
| filter_numbers | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers |
| | SINK TO out_num_cnt |
| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input WHERE number > 10 |
+----------------+----------------------------------------------------------------------------------------------------------------------------------------+
drop flow filter_numbers;
Affected Rows: 0
DROP TABLE out_num_cnt;
drop table out_num_cnt;
Affected Rows: 0
drop table numbers_input;
Affected Rows: 0

View File

@@ -30,34 +30,24 @@ DROP FLOW test_numbers;
DROP TABLE numbers_input;
DROP TABLE out_num_cnt;
-- test interprete interval
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
create table out_num_cnt (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts);
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input where number > 10;
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
SHOW CREATE FLOW filter_numbers;
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
drop flow filter_numbers;
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
drop table out_num_cnt;
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
DROP FLOW test_numbers;
DROP TABLE numbers_input;
DROP TABLE out_num_cnt;
drop table numbers_input;

View File

@@ -124,3 +124,127 @@ DROP TABLE out_num_cnt_df_func;
Affected Rows: 0
-- test date_bin
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond);
Affected Rows: 0
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
+-------+---------------------+
| col_0 | col_1 |
+-------+---------------------+
| 2 | 2021-07-01T00:00:00 |
+-------+---------------------+
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
+-------+---------------------+
| col_0 | col_1 |
+-------+---------------------+
| 2 | 2021-07-01T00:00:00 |
| 1 | 2021-07-01T00:00:01 |
+-------+---------------------+
DROP FLOW test_numbers;
Affected Rows: 0
DROP TABLE numbers_input;
Affected Rows: 0
DROP TABLE out_num_cnt;
Affected Rows: 0
-- test date_trunc
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts);
Affected Rows: 0
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
+---------------------+-------+
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
| 2021-07-01T00:00:01 | 47 |
+---------------------+-------+
DROP FLOW test_numbers;
Affected Rows: 0
DROP TABLE numbers_input;
Affected Rows: 0
DROP TABLE out_num_cnt;
Affected Rows: 0

View File

@@ -65,3 +65,70 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
DROP TABLE out_num_cnt_df_func;
-- test date_bin
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond);
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
DROP FLOW test_numbers;
DROP TABLE numbers_input;
DROP TABLE out_num_cnt;
-- test date_trunc
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts);
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
DROP FLOW test_numbers;
DROP TABLE numbers_input;
DROP TABLE out_num_cnt;