fix(flow): correctness bugs (#4018)

* fix: optional args of tumble

* fix(WIP): choose

* feat: rename default ts to GREPTIME_TIMESTAMP

* fix: default timestamp name

* fix: reorder write requests

* fix: expire state

* fix: test of tumble

* fix: send buf clear

* fix: ts<start time correct window

* fix: window_start when ts<start_time

* mend

* fix: range begin>range end

* refactor: per reviews

* feat!: ts placeholder rename to __ts_placeholder

* refactor: better condition

* tests(WIP): func sig choose

* tests(WIP): tumble func

* feat: make run_available optional blocking

* tests: tumble transform

* chore: clippy

* fix?: lagged missing data

* fix: flow source break on empty chnl
This commit is contained in:
discord9
2024-05-30 11:49:11 +08:00
committed by GitHub
parent 6e9a9dc333
commit 7de336f087
16 changed files with 784 additions and 103 deletions

View File

@@ -35,12 +35,12 @@ use itertools::Itertools;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
use table::metadata::TableId;
use tokio::sync::{oneshot, watch, Mutex, RwLock};
use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::adapter::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
@@ -66,6 +66,11 @@ use error::Error;
pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
// TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
pub const UPDATE_AT_TS_COL: &str = "update_at";
// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
@@ -279,10 +284,16 @@ impl FlownodeManager {
.map(|i| meta.schema.column_schemas[i].name.clone())
.collect_vec();
let schema = meta.schema.column_schemas;
let is_auto_create = schema
.last()
.map(|s| s.name == "__ts_placeholder")
.unwrap_or(false);
// check if the last column is the auto created timestamp column, hence the table is auto created from
// flow's plan type
let is_auto_create = {
let correct_name = schema
.last()
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
.unwrap_or(false);
let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1);
correct_name && correct_time_index
};
(primary_keys, schema, is_auto_create)
} else {
// TODO(discord9): condiser remove buggy auto create by schema
@@ -302,6 +313,7 @@ impl FlownodeManager {
.clone();
// TODO(discord9): use default key from schema
let primary_keys = schema
.typ()
.keys
.first()
.map(|v| {
@@ -312,24 +324,31 @@ impl FlownodeManager {
})
.unwrap_or_default();
let update_at = ColumnSchema::new(
"update_at",
UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
let ts_col = ColumnSchema::new(
"__ts_placeholder",
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
let wout_ts = schema
.typ()
.column_types
.clone()
.into_iter()
.enumerate()
.map(|(idx, typ)| {
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
let name = schema
.names
.get(idx)
.cloned()
.unwrap_or(format!("Col_{}", idx));
ColumnSchema::new(name, typ.scalar_type, typ.nullable)
})
.collect_vec();
@@ -339,7 +358,7 @@ impl FlownodeManager {
(primary_keys, with_ts, true)
};
let schema_len = schema.len();
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
debug!(
@@ -348,16 +367,7 @@ impl FlownodeManager {
table_name.join("."),
reqs
);
let now = SystemTime::now();
let now = now
.duration_since(SystemTime::UNIX_EPOCH)
.map(|s| s.as_millis() as repr::Timestamp)
.unwrap_or_else(|_| {
-(SystemTime::UNIX_EPOCH
.duration_since(now)
.unwrap()
.as_millis() as repr::Timestamp)
});
let now = self.tick_manager.tick();
for req in reqs {
match req {
DiffRequest::Insert(insert) => {
@@ -370,13 +380,23 @@ impl FlownodeManager {
))]);
// ts col, if auto create
if is_auto_create {
ensure!(
row.len() == schema_len - 1,
InternalSnafu {
reason: format!(
"Row len mismatch, expect {} got {}",
schema_len - 1,
row.len()
)
}
);
row.extend([Value::from(
common_time::Timestamp::new_millisecond(0),
)]);
}
row.into()
Ok(row.into())
})
.collect::<Vec<_>>();
.collect::<Result<Vec<_>, Error>>()?;
let table_name = table_name.last().unwrap().clone();
let req = RowInsertRequest {
table_name,
@@ -490,9 +510,12 @@ impl FlownodeManager {
debug!("Starting to run");
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
self.run_available().await.unwrap();
debug!("call run_available in run every second");
self.run_available(true).await.unwrap();
debug!("call send_writeback_requests in run every second");
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
debug!("call log_all_errors in run every second");
self.log_all_errors().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
@@ -501,29 +524,44 @@ impl FlownodeManager {
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
/// set `blocking` to true to wait until lock is acquired
/// and false to return immediately if lock is not acquired
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self) -> Result<(), Error> {
let now = self.tick_manager.tick();
pub async fn run_available(&self, blocking: bool) -> Result<(), Error> {
loop {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
// TODO(discord9): consider how to handle error in individual worker
worker.lock().await.run_available(now).await.unwrap();
if blocking {
worker.lock().await.run_available(now).await?;
} else if let Ok(worker) = worker.try_lock() {
worker.run_available(now).await?;
} else {
return Ok(());
}
}
// first check how many inputs were sent
let send_cnt = match self.node_context.lock().await.flush_all_sender() {
Ok(cnt) => cnt,
let (flush_res, buf_len) = if blocking {
let mut ctx = self.node_context.lock().await;
(ctx.flush_all_sender(), ctx.get_send_buf_size())
} else {
match self.node_context.try_lock() {
Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()),
Err(_) => return Ok(()),
}
};
match flush_res {
Ok(_) => (),
Err(err) => {
common_telemetry::error!("Flush send buf errors: {:?}", err);
break;
}
};
// if no inputs
if send_cnt == 0 {
// if no thing in send buf then break
if buf_len == 0 {
break;
} else {
debug!("FlownodeManager::run_available: send_cnt={}", send_cnt);
debug!("Send buf len = {}", buf_len);
}
}
@@ -543,6 +581,8 @@ impl FlownodeManager {
);
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
// TODO(discord9): put it in a background task?
// self.run_available(false).await?;
Ok(())
}
}
@@ -653,21 +693,22 @@ impl FlownodeManager {
///
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
/// TSO coord mess
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FlowTickManager {
/// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
start: Instant,
}
impl std::fmt::Debug for FlowTickManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowTickManager").finish()
}
/// The timestamp when the flow started
start_timestamp: repr::Timestamp,
}
impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {
start: Instant::now(),
start_timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as repr::Timestamp,
}
}
@@ -677,6 +718,6 @@ impl FlowTickManager {
pub fn tick(&self) -> repr::Timestamp {
let current = Instant::now();
let since_the_epoch = current - self.start;
since_the_epoch.as_millis() as repr::Timestamp
since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
}
}

View File

@@ -14,13 +14,17 @@
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
use std::collections::HashMap;
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::debug;
use itertools::Itertools;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::adapter::FlownodeManager;
use crate::repr::{self, DiffRow};
@@ -101,12 +105,57 @@ impl Flownode for FlownodeManager {
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
for write_request in request.requests {
let region_id = write_request.region_id;
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
let table_id = RegionId::from(region_id).table_id();
let (insert_schema, rows_proto) = write_request
.rows
.map(|r| (r.schema, r.rows))
.unwrap_or_default();
// TODO(discord9): reconsider time assignment mechanism
let now = self.tick_manager.tick();
let fetch_order = {
let ctx = self.node_context.lock().await;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)
.map(|r| r.1)
.and_then(|id| ctx.schema.get(&id))
.map(|desc| &desc.names)
.context(UnexpectedSnafu {
err_msg: format!("Table not found: {}", table_id),
})?;
let name_to_col = HashMap::<_, _>::from_iter(
insert_schema
.iter()
.enumerate()
.map(|(i, name)| (&name.column_name, i)),
);
let fetch_order: Vec<usize> = table_col_names
.iter()
.map(|names| {
name_to_col.get(names).copied().context(UnexpectedSnafu {
err_msg: format!("Column not found: {}", names),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
debug!("Reordering columns: {:?}", fetch_order)
}
fetch_order
};
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(repr::Row::from)
.map(|r| {
let r = repr::Row::from(r);
let reordered = fetch_order
.iter()
.map(|&i| r.inner[i].clone())
.collect_vec();
repr::Row::new(reordered)
})
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows)

View File

@@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};
use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP};
/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
@@ -51,10 +51,8 @@ pub struct FlownodeContext {
mpsc::UnboundedReceiver<DiffRow>,
),
>,
/// store source in buffer for each source table, in case broadcast channel is full
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationType>,
pub schema: HashMap<GlobalId, RelationDesc>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
@@ -73,7 +71,8 @@ pub struct SourceSender {
impl Default for SourceSender {
fn default() -> Self {
Self {
sender: broadcast::Sender::new(BROADCAST_CAP),
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
send_buf: Default::default(),
}
}
@@ -109,6 +108,7 @@ impl SourceSender {
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
debug!("Remaining Send buf.len() = {}", self.send_buf.len());
}
Ok(row_cnt)
@@ -140,12 +140,19 @@ impl FlownodeContext {
}
/// flush all sender's buf
///
/// return numbers being sent
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
self.source_sender
.iter_mut()
.map(|(_table_id, src_sender)| src_sender.try_send_all())
.try_fold(0, |acc, x| x.map(|x| x + acc))
}
/// Return the sum number of rows in all send buf
pub fn get_send_buf_size(&self) -> usize {
self.source_sender.values().map(|v| v.send_buf.len()).sum()
}
}
impl FlownodeContext {
@@ -226,7 +233,7 @@ impl FlownodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> {
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
let id = self
.table_repr
.get_by_name(name)
@@ -297,7 +304,7 @@ impl FlownodeContext {
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema);
self.schema.insert(gid, schema.into_unnamed());
Ok(())
}

View File

@@ -17,7 +17,6 @@
use common_error::ext::BoxedError;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
@@ -25,7 +24,7 @@ use crate::adapter::error::{
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::adapter::TableName;
use crate::repr::{self, ColumnType, RelationType};
use crate::repr::{self, ColumnType, RelationDesc, RelationType};
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableSource {
@@ -107,7 +106,7 @@ impl TableSource {
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
) -> Result<(TableName, RelationDesc), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
@@ -123,14 +122,20 @@ impl TableSource {
];
let raw_schema = table_info_value.table_info.meta.schema;
let column_types = raw_schema
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
.clone()
.into_iter()
.map(|col| ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
.map(|col| {
(
ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
},
col.name,
)
})
.collect_vec();
.unzip();
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![repr::Key::from(key)];
@@ -138,10 +143,13 @@ impl TableSource {
let time_index = raw_schema.timestamp_index;
Ok((
table_name,
RelationType {
column_types,
keys,
time_index,
RelationDesc {
typ: RelationType {
column_types,
keys,
time_index,
},
names: col_names,
},
))
}

View File

@@ -124,9 +124,13 @@ fn mfp_subgraph(
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
// 2. Output the updates.
// 3. Truncate all updates within that range.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = arrange.read().last_compaction_time();
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
let range = (
std::ops::Bound::Excluded(from),
std::ops::Bound::Included(now),
);
let output_kv = arrange.read().get_updates_in_range(range);
// the output is expected to be key -> empty val
let output = output_kv
.into_iter()

View File

@@ -26,7 +26,7 @@ use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::{Context, SubgraphArg};
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
@@ -301,9 +301,13 @@ fn update_reduce_distinct_arrange(
// Deal with output:
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = arrange.read().last_compaction_time();
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
let range = (
std::ops::Bound::Excluded(from),
std::ops::Bound::Included(now),
);
let output_kv = arrange.read().get_updates_in_range(range);
// 2. Truncate all updates stored in arrangement within that range.
let run_compaction = || {
@@ -397,6 +401,29 @@ fn reduce_accum_subgraph(
// TODO(discord9): consider key-based lock
let mut arrange = arrange.write();
for (key, value_diffs) in key_to_vals {
if let Some(expire_man) = &arrange.get_expire_state() {
let mut is_expired = false;
err_collector.run(|| {
if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
is_expired = true;
// expired data is ignored in computation, and a simple warning is logged
common_telemetry::warn!(
"Data already expired: {}",
DataAlreadyExpiredSnafu {
expired_by: expired,
}
.build()
);
Ok(())
} else {
Ok(())
}
});
if is_expired {
// errors already collected, we can just continue to next key
continue;
}
}
let col_diffs = {
let row_len = value_diffs[0].0.len();
let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));

View File

@@ -20,12 +20,14 @@ use common_telemetry::{debug, info};
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, mpsc};
use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::expr::GlobalId;
use crate::expr::error::InternalSnafu;
use crate::expr::{EvalError, GlobalId};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
#[allow(clippy::mutable_key_type)]
@@ -55,18 +57,43 @@ impl<'referred, 'df> Context<'referred, 'df> {
.df
.add_subgraph_source("source", send_port, move |_ctx, send| {
let now = *now.borrow();
let arr = arrange_handler_inner.write().get_updates_in_range(..=now);
err_collector.run(|| arrange_handler_inner.write().compact_to(now));
// write lock to prevent unexpected mutation
let mut arranged = arrange_handler_inner.write();
let arr = arranged.get_updates_in_range(..=now);
err_collector.run(|| arranged.compact_to(now));
debug!("Call source");
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
let mut to_send = Vec::new();
let mut to_arrange = Vec::new();
// TODO(discord9): handling tokio broadcast error
while let Ok((r, t, d)) = src_recv.try_recv() {
if t <= now {
to_send.push((r, t, d));
} else {
to_arrange.push(((r, Row::empty()), t, d));
loop {
match src_recv.try_recv() {
Ok((r, t, d)) => {
if t <= now {
to_send.push((r, t, d));
} else {
to_arrange.push(((r, Row::empty()), t, d));
}
}
Err(TryRecvError::Empty) => {
break;
}
Err(TryRecvError::Lagged(lag_offset)) => {
common_telemetry::error!("Flow missing {} rows behind", lag_offset);
break;
}
Err(err) => {
err_collector.run(|| -> Result<(), EvalError> {
InternalSnafu {
reason: format!(
"Error receiving from broadcast channel: {}",
err
),
}
.fail()
});
}
}
}
let all = prev_avail.chain(to_send).collect_vec();
@@ -77,10 +104,10 @@ impl<'referred, 'df> Context<'referred, 'df> {
to_arrange.len()
);
}
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
err_collector.run(|| arranged.apply_updates(now, to_arrange));
send.give(all);
// always schedule source to run at next tick
inner_schd.schedule_at(now + 1);
// always schedule source to run at now so we can repeatedly run source if needed
inner_schd.schedule_at(now);
});
schd.set_cur_subgraph(sub);
let arranged = Arranged::new(arrange_handler);

View File

@@ -100,4 +100,11 @@ pub enum EvalError {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Incoming data already expired by {} ms", expired_by))]
DataAlreadyExpired {
expired_by: i64,
#[snafu(implicit)]
location: Location,
},
}

View File

@@ -76,6 +76,13 @@ impl UnmaterializableFunc {
}
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(
name.to_lowercase().as_str(),
"now" | "current_schema" | "tumble"
)
}
/// Create a UnmaterializableFunc from a string of the function name
pub fn from_str_args(name: &str, args: Vec<TypedExpr>) -> Result<Self, Error> {
match name.to_lowercase().as_str() {
@@ -183,6 +190,13 @@ impl UnaryFunc {
}
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(
name.to_lowercase().as_str(),
"not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast"
)
}
/// Create a UnaryFunc from a string of the function name and given argument type(optional)
pub fn from_str_and_type(
name: &str,
@@ -278,9 +292,9 @@ impl UnaryFunc {
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;
let window_start = get_window_start(ts, window_size, start_time);
let ret = Timestamp::new_millisecond(window_start);
Ok(Value::from(ret))
@@ -290,9 +304,9 @@ impl UnaryFunc {
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;
let window_start = get_window_start(ts, window_size, start_time);
let window_end = window_start + window_size;
let ret = Timestamp::new_millisecond(window_end);
@@ -302,6 +316,35 @@ impl UnaryFunc {
}
}
fn get_window_start(
ts: repr::Timestamp,
window_size: repr::Duration,
start_time: Option<repr::Timestamp>,
) -> repr::Timestamp {
let start_time = start_time.unwrap_or(0);
// left close right open
if ts >= start_time {
start_time + (ts - start_time) / window_size * window_size
} else {
start_time + (ts - start_time) / window_size * window_size
- if ((start_time - ts) % window_size) != 0 {
window_size
} else {
0
}
}
}
#[test]
fn test_get_window_start() {
assert_eq!(get_window_start(1, 3, None), 0);
assert_eq!(get_window_start(3, 3, None), 3);
assert_eq!(get_window_start(0, 3, None), 0);
assert_eq!(get_window_start(-1, 3, None), -3);
assert_eq!(get_window_start(-3, 3, None), -3);
}
fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
let ts = if let Some(ts) = arg.as_timestamp() {
ts.convert_to(TimeUnit::Millisecond)
@@ -550,6 +593,27 @@ impl BinaryFunc {
Ok(ret)
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(
name.to_lowercase().as_str(),
"eq" | "equal"
| "not_eq"
| "not_equal"
| "lt"
| "lte"
| "gt"
| "gte"
| "add"
| "sub"
| "subtract"
| "mul"
| "multiply"
| "div"
| "divide"
| "mod"
)
}
/// choose the appropriate specialization based on the input types
/// return a specialization of the binary function and it's actual input and output type(so no null type present)
///
@@ -741,6 +805,10 @@ impl VariadicFunc {
}
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(name.to_lowercase().as_str(), "and" | "or")
}
/// Create a VariadicFunc from a string of the function name and given argument types(optional)
pub fn from_str_and_types(
name: &str,

View File

@@ -45,6 +45,8 @@ impl TypedExpr {
impl TypedExpr {
/// expand multi-value expression to multiple expressions with new indices
///
/// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling`
pub fn expand_multi_value(
input_typ: &RelationType,
exprs: &[TypedExpr],

View File

@@ -262,6 +262,19 @@ impl RelationType {
true
}
/// Return relation describe with column names
pub fn into_named(self, names: Vec<ColumnName>) -> RelationDesc {
RelationDesc { typ: self, names }
}
/// Return relation describe without column names
pub fn into_unnamed(self) -> RelationDesc {
RelationDesc {
typ: self,
names: vec![],
}
}
}
/// The type of a `Value`
@@ -325,8 +338,8 @@ fn return_true() -> bool {
/// Individual column names are optional.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct RelationDesc {
typ: RelationType,
names: Vec<ColumnName>,
pub typ: RelationType,
pub names: Vec<ColumnName>,
}
impl RelationDesc {

View File

@@ -211,7 +211,7 @@ mod test {
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
tri_map.insert(Some(name.clone()), Some(1024), gid);
schemas.insert(gid, schema);
schemas.insert(gid, schema.into_unnamed());
}
{
@@ -225,7 +225,7 @@ mod test {
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::datetime_datatype(), false),
]);
schemas.insert(gid, schema);
schemas.insert(gid, schema.into_unnamed());
tri_map.insert(Some(name.clone()), Some(1025), gid);
}

View File

@@ -435,6 +435,236 @@ mod test {
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
/// TODO(discord9): add more illegal sql tests
#[tokio::test]
async fn test_tumble_composite() {
let engine = create_test_query_engine();
let sql =
"SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
let aggr_exprs = vec![
AggregateExpr {
func: AggregateFunc::SumUInt32,
expr: ScalarExpr::Column(0),
distinct: false,
},
AggregateExpr {
func: AggregateFunc::Count,
expr: ScalarExpr::Column(0),
distinct: false,
},
];
let avg_expr = ScalarExpr::If {
cond: Box::new(ScalarExpr::Column(4).call_binary(
ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()),
BinaryFunc::NotEq,
)),
then: Box::new(ScalarExpr::Column(3).call_binary(
ScalarExpr::Column(4).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
BinaryFunc::DivUInt64,
)),
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
.with_time_index(Some(0)),*/
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: None,
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: None,
},
),
ScalarExpr::Column(0),
])
.unwrap()
.project(vec![2, 3, 4])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.project(vec![0, 1])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: aggr_exprs.clone(),
simple_aggrs: vec![
AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
// keys
ColumnType::new(CDT::datetime_datatype(), false), // window start(time index)
ColumnType::new(CDT::datetime_datatype(), false), // window end(pk)
ColumnType::new(CDT::uint32_datatype(), false), // number(pk)
// values
ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number)
ColumnType::new(CDT::int64_datatype(), true), // avg.count(number)
])
.with_key(vec![1, 2])
.with_time_index(Some(0)),
),
),
mfp: MapFilterProject::new(5)
.map(vec![
avg_expr,
ScalarExpr::Column(2), // number(pk)
ScalarExpr::Column(5), // avg.sum(number)
ScalarExpr::Column(0), // window start
ScalarExpr::Column(1), // window end
])
.unwrap()
.project(vec![6, 7, 8, 9])
.unwrap(),
},
typ: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false), // number
ColumnType::new(CDT::uint64_datatype(), true), // avg(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
]),
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_tumble_parse_optional() {
let engine = create_test_query_engine();
let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour')";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
let aggr_expr = AggregateExpr {
func: AggregateFunc::SumUInt32,
expr: ScalarExpr::Column(0),
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
]),
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
.with_time_index(Some(0)),*/
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: None,
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: None,
},
),
])
.unwrap()
.project(vec![2, 3])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.project(vec![0, 1])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0)),
),
),
mfp: MapFilterProject::new(3)
.map(vec![
ScalarExpr::Column(2),
ScalarExpr::Column(3),
ScalarExpr::Column(0),
ScalarExpr::Column(1),
])
.unwrap()
.project(vec![4, 5, 6])
.unwrap(),
},
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_tumble_parse() {
let engine = create_test_query_engine();

View File

@@ -101,8 +101,7 @@ impl TypedExpr {
.unzip();
match arg_len {
// because variadic function can also have 1 arguments, we need to check if it's a variadic function first
1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => {
1 if UnaryFunc::is_valid_func_name(fn_name) => {
let func = UnaryFunc::from_str_and_type(fn_name, None)?;
let arg = arg_exprs[0].clone();
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
@@ -124,8 +123,7 @@ impl TypedExpr {
Ok(TypedExpr::new(arg.call_unary(func), ret_type))
}
// because variadic function can also have 2 arguments, we need to check if it's a variadic function first
2 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => {
2 if BinaryFunc::is_valid_func_name(fn_name) => {
let (func, signature) =
BinaryFunc::from_str_expr_and_type(fn_name, &arg_exprs, &arg_types[0..2])?;
@@ -167,7 +165,8 @@ impl TypedExpr {
Ok(TypedExpr::new(ret_expr, ret_type))
}
_var => {
if let Ok(func) = VariadicFunc::from_str_and_types(fn_name, &arg_types) {
if VariadicFunc::is_valid_func_name(fn_name) {
let func = VariadicFunc::from_str_and_types(fn_name, &arg_types)?;
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
let mut expr = ScalarExpr::CallVariadic {
func,
@@ -175,9 +174,8 @@ impl TypedExpr {
};
expr.optimize();
Ok(TypedExpr::new(expr, ret_type))
} else if let Ok(func) =
UnmaterializableFunc::from_str_args(fn_name, arg_typed_exprs)
{
} else if UnmaterializableFunc::is_valid_func_name(fn_name) {
let func = UnmaterializableFunc::from_str_args(fn_name, arg_typed_exprs)?;
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
Ok(TypedExpr::new(
ScalarExpr::CallUnmaterializable(func),
@@ -324,8 +322,12 @@ impl TypedExpr {
#[cfg(test)]
mod test {
use std::collections::HashMap;
use common_time::{DateTime, Interval};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use pretty_assertions::assert_eq;
use super::*;
use crate::expr::{GlobalId, MapFilterProject};
@@ -510,4 +512,162 @@ mod test {
assert_eq!(flow_plan.unwrap(), expected);
}
#[test]
fn test_func_sig() {
fn lit(v: impl ToString) -> substrait_proto::proto::FunctionArgument {
use substrait_proto::proto::expression;
let expr = Expression {
rex_type: Some(expression::RexType::Literal(expression::Literal {
nullable: false,
type_variation_reference: 0,
literal_type: Some(expression::literal::LiteralType::String(v.to_string())),
})),
};
substrait_proto::proto::FunctionArgument {
arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value(
expr,
)),
}
}
fn col(i: usize) -> substrait_proto::proto::FunctionArgument {
use substrait_proto::proto::expression;
let expr = Expression {
rex_type: Some(expression::RexType::Selection(Box::new(
expression::FieldReference {
reference_type: Some(
expression::field_reference::ReferenceType::DirectReference(
expression::ReferenceSegment {
reference_type: Some(
expression::reference_segment::ReferenceType::StructField(
Box::new(expression::reference_segment::StructField {
field: i as i32,
child: None,
}),
),
),
},
),
),
root_type: None,
},
))),
};
substrait_proto::proto::FunctionArgument {
arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value(
expr,
)),
}
}
let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0)],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
let extensions = FunctionExtensions {
anchor_to_name: HashMap::from([(0, "is_null".to_string())]),
};
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap();
assert_eq!(
res,
TypedExpr {
expr: ScalarExpr::Column(0).call_unary(UnaryFunc::IsNull),
typ: ColumnType {
scalar_type: CDT::boolean_datatype(),
nullable: true,
},
}
);
let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0), col(1)],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::uint32_datatype(), false),
]);
let extensions = FunctionExtensions {
anchor_to_name: HashMap::from([(0, "add".to_string())]),
};
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap();
assert_eq!(
res,
TypedExpr {
expr: ScalarExpr::Column(0)
.call_binary(ScalarExpr::Column(1), BinaryFunc::AddUInt32,),
typ: ColumnType {
scalar_type: CDT::uint32_datatype(),
nullable: true,
},
}
);
let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
ColumnType::new(CDT::string_datatype(), false),
]);
let extensions = FunctionExtensions {
anchor_to_name: HashMap::from([(0, "tumble".to_string())]),
};
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap();
assert_eq!(
res,
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
ts: Box::new(
ScalarExpr::Column(0)
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
),
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
start_time: Some(DateTime::new(1625097600000))
})
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
);
let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0), lit("1 second")],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
ColumnType::new(CDT::string_datatype(), false),
]);
let extensions = FunctionExtensions {
anchor_to_name: HashMap::from([(0, "tumble".to_string())]),
};
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap();
assert_eq!(
res,
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
ts: Box::new(
ScalarExpr::Column(0)
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
),
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
start_time: None
})
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
)
}
}

View File

@@ -269,7 +269,7 @@ impl TypedPlan {
id: crate::expr::Id::Global(table.0),
};
let get_table = TypedPlan {
typ: table.1,
typ: table.1.typ().clone(),
plan: get_table,
};

View File

@@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound;
use std::sync::Arc;
use common_telemetry::debug;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
@@ -86,7 +87,7 @@ impl KeyExpiryManager {
///
/// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired.
/// - If it's not expired, return None
pub fn update_event_ts(
pub fn get_expire_duration_and_update_event_ts(
&mut self,
now: Timestamp,
row: &Row,
@@ -95,6 +96,33 @@ impl KeyExpiryManager {
return Ok(None);
};
self.event_ts_to_key
.entry(event_ts)
.or_default()
.insert(row.clone());
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
if expire_time > event_ts {
// return how much time it's expired
return Ok(Some(expire_time - event_ts));
}
}
Ok(None)
}
/// Get the expire duration of a key, if it's expired by now.
///
/// Return None if the key is not expired
pub fn get_expire_duration(
&self,
now: Timestamp,
row: &Row,
) -> Result<Option<Duration>, EvalError> {
let Some(event_ts) = self.extract_event_ts(row)? else {
return Ok(None);
};
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
if expire_time > event_ts {
// return how much time it's expired
@@ -102,10 +130,6 @@ impl KeyExpiryManager {
}
}
self.event_ts_to_key
.entry(event_ts)
.or_default()
.insert(row.clone());
Ok(None)
}
@@ -189,6 +213,10 @@ impl Arrangement {
}
}
pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> {
self.expire_state.as_ref()
}
pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
self.expire_state = Some(expire_state);
}
@@ -208,8 +236,12 @@ impl Arrangement {
for ((key, val), update_ts, diff) in updates {
// check if the key is expired
if let Some(s) = &mut self.expire_state {
if let Some(expired_by) = s.update_event_ts(now, &key)? {
if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? {
max_expired_by = max_expired_by.max(Some(expired_by));
debug!(
"Expired key: {:?}, expired by: {:?} with time being now={}",
key, expired_by, now
);
continue;
}
}
@@ -335,7 +367,9 @@ impl Arrangement {
for (key, updates) in batch {
// check if the key is expired
if let Some(s) = &mut self.expire_state {
if let Some(expired_by) = s.update_event_ts(now, &key)? {
if let Some(expired_by) =
s.get_expire_duration_and_update_event_ts(now, &key)?
{
max_expired_by = max_expired_by.max(Some(expired_by));
continue;
}
@@ -540,6 +574,10 @@ impl ArrangeHandler {
pub fn set_full_arrangement(&self, full: bool) {
self.write().full_arrangement = full;
}
pub fn is_full_arrangement(&self) -> bool {
self.read().full_arrangement
}
}
#[cfg(test)]