diff --git a/Cargo.lock b/Cargo.lock index 2356c7e523..385899a55e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4165,6 +4165,7 @@ dependencies = [ "bytes", "cache", "catalog", + "chrono", "client", "common-base", "common-catalog", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 461e4382e5..b671d2faa1 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -16,6 +16,7 @@ async-trait.workspace = true bytes.workspace = true cache.workspace = true catalog.workspace = true +chrono.workspace = true client.workspace = true common-base.workspace = true common-config.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 7d23c445f9..dc7df77037 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -64,7 +64,7 @@ pub(crate) mod refill; mod stat; #[cfg(test)] mod tests; -mod util; +pub(crate) mod util; mod worker; pub(crate) mod node_context; diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1312b5e175..55131b0d1b 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -155,8 +155,11 @@ impl Flownode for FlowWorkerManager { #[allow(unreachable_code, unused)] async fn handle_inserts(&self, request: InsertRequests) -> Result { - return Ok(Default::default()); - + return self + .rule_engine + .handle_inserts(request) + .await + .map_err(to_meta_err(snafu::location!())); // using try_read to ensure two things: // 1. flush wouldn't happen until inserts before it is inserted // 2. inserts happening concurrently with flush wouldn't be block by flush @@ -209,15 +212,15 @@ impl Flownode for FlowWorkerManager { .collect_vec(); let table_col_names = table_schema.relation_desc.names; let table_col_names = table_col_names - .iter().enumerate() - .map(|(idx,name)| match name { - Some(name) => Ok(name.clone()), - None => InternalSnafu { - reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"), - } - .fail().map_err(BoxedError::new).context(ExternalSnafu), - }) - .collect::>>()?; + .iter().enumerate() + .map(|(idx,name)| match name { + Some(name) => Ok(name.clone()), + None => InternalSnafu { + reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"), + } + .fail().map_err(BoxedError::new).context(ExternalSnafu), + }) + .collect::>>()?; let name_to_col = HashMap::<_, _>::from_iter( insert_schema .iter() diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index 61ffc8014d..e6c2f98570 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Some utility functions + use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index ec1c200f54..356e45fc67 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -54,6 +54,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Time error"))] + Time { + source: common_time::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("External error"))] External { source: BoxedError, @@ -249,7 +256,9 @@ impl ErrorExt for Error { | Self::FlowNotFound { .. } | Self::ListFlows { .. } => StatusCode::TableNotFound, Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, - Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery, + Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Time { .. } => { + StatusCode::EngineExecuteQuery + } Self::Unexpected { .. } => StatusCode::Unexpected, Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index ae98e015bc..9e771520ee 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -14,6 +14,7 @@ //! Send heartbeat from flownode to metasrv +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Peer}; @@ -24,7 +25,7 @@ use common_meta::heartbeat::handler::{ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_meta::key::flow::flow_state::FlowStat; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use greptime_proto::v1::meta::NodeInfo; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; @@ -65,6 +66,7 @@ pub struct HeartbeatTask { report_interval: Duration, retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + running: Arc, query_stat_size: Option, } @@ -87,6 +89,7 @@ impl HeartbeatTask { report_interval: heartbeat_opts.interval, retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, + running: Arc::new(AtomicBool::new(false)), query_stat_size: None, } } diff --git a/src/flow/src/recording_rules.rs b/src/flow/src/recording_rules.rs index 51b9211e23..be2645fac6 100644 --- a/src/flow/src/recording_rules.rs +++ b/src/flow/src/recording_rules.rs @@ -17,11 +17,14 @@ mod engine; mod frontend_client; -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; use std::sync::Arc; +use api::helper::pb_value_to_value_ref; +use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_recordbatch::DfRecordBatch; +use common_telemetry::warn; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datafusion::error::Result as DfResult; @@ -31,9 +34,10 @@ use datafusion::prelude::SessionContext; use datafusion::sql::unparser::Unparser; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter}; use datafusion_common::{Column, DFSchema, TableReference}; -use datafusion_expr::LogicalPlan; +use datafusion_expr::{ColumnarValue, LogicalPlan}; use datafusion_physical_expr::PhysicalExprRef; use datatypes::prelude::{ConcreteDataType, DataType}; +use datatypes::scalars::ScalarVector; use datatypes::value::Value; use datatypes::vectors::{ TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, @@ -41,15 +45,192 @@ use datatypes::vectors::{ }; pub use engine::RecordingRuleEngine; pub use frontend_client::FrontendClient; +use itertools::Itertools; use query::parser::QueryLanguageParser; use query::QueryEngineRef; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; +use crate::adapter::util::from_proto_to_data_type; use crate::df_optimizer::apply_df_optimizer; use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu}; +use crate::expr::error::DataTypeSnafu; use crate::Error; +#[derive(Debug, Clone)] +pub struct TimeWindowExpr { + expr: PhysicalExprRef, + column_name: String, +} + +impl TimeWindowExpr { + pub fn new(expr: PhysicalExprRef, column_name: String) -> Self { + Self { expr, column_name } + } + + pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result { + let phy_planner = DefaultPhysicalPlanner::default(); + + let phy_expr: PhysicalExprRef = phy_planner + .create_physical_expr(expr, df_schema, &SessionContext::new().state()) + .with_context(|_e| DatafusionSnafu { + context: format!( + "Failed to create physical expression from {expr:?} using {df_schema:?}" + ), + })?; + Ok(Self { + expr: phy_expr, + column_name: column_name.to_string(), + }) + } + + /// Find timestamps from rows using time window expr + pub async fn handle_rows( + &self, + rows_list: Vec, + ) -> Result, Error> { + let mut time_windows = BTreeSet::new(); + + for rows in rows_list { + // pick the time index column and use it to eval on `self.expr` + let ts_col_index = rows + .schema + .iter() + .map(|col| col.column_name.clone()) + .position(|name| name == self.column_name); + let Some(ts_col_index) = ts_col_index else { + warn!("can't found time index column in schema: {:?}", rows.schema); + continue; + }; + let col_schema = &rows.schema[ts_col_index]; + let cdt = from_proto_to_data_type(col_schema)?; + + let column_values = rows + .rows + .iter() + .map(|row| &row.values[ts_col_index]) + .collect_vec(); + + let mut vector = cdt.create_mutable_vector(column_values.len()); + for value in column_values { + let value = pb_value_to_value_ref(value, &None); + vector.try_push_value_ref(value).context(DataTypeSnafu { + msg: "Failed to convert rows to columns", + })?; + } + let vector = vector.to_vector(); + + let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?; + + let rb = + DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()]) + .with_context(|_e| ArrowSnafu { + context: format!( + "Failed to create record batch from {df_schema:?} and {vector:?}" + ), + })?; + + let eval_res = self.expr.evaluate(&rb).with_context(|_| DatafusionSnafu { + context: format!( + "Failed to evaluate physical expression {:?} on {rb:?}", + self.expr + ), + })?; + + let res = columnar_to_ts_vector(&eval_res)?; + + for ts in res.into_iter().flatten() { + time_windows.insert(ts); + } + } + + Ok(time_windows) + } +} + +fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result { + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + name, + cdt.as_arrow_type(), + false, + )])); + + let df_schema = DFSchema::from_field_specific_qualified_schema( + vec![Some(TableReference::bare("TimeIndexOnlyTable"))], + &arrow_schema, + ) + .with_context(|_e| DatafusionSnafu { + context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"), + })?; + + Ok(df_schema) +} + +/// Convert `ColumnarValue` to `Vec>` +fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result>, Error> { + let val = match columnar { + datafusion_expr::ColumnarValue::Array(array) => { + let ty = array.data_type(); + let ty = ConcreteDataType::from_arrow_type(ty); + let time_unit = if let ConcreteDataType::Timestamp(ty) = ty { + ty.unit() + } else { + return UnexpectedSnafu { + reason: format!("Non-timestamp type: {ty:?}"), + } + .fail(); + }; + + match time_unit { + TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .iter_data() + .map(|d| d.map(|d| d.0)) + .collect_vec(), + TimeUnit::Millisecond => { + TimestampMillisecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .iter_data() + .map(|d| d.map(|d| d.0)) + .collect_vec() + } + TimeUnit::Microsecond => { + TimestampMicrosecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .iter_data() + .map(|d| d.map(|d| d.0)) + .collect_vec() + } + TimeUnit::Nanosecond => { + TimestampNanosecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .iter_data() + .map(|d| d.map(|d| d.0)) + .collect_vec() + } + } + } + datafusion_expr::ColumnarValue::Scalar(scalar) => { + let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu { + extra: format!("Failed to convert scalar {scalar:?} to value"), + })?; + let ts = value.as_timestamp().context(UnexpectedSnafu { + reason: format!("Expect Timestamp, found {:?}", value), + })?; + vec![Some(ts)] + } + }; + Ok(val) +} + /// Convert sql to datafusion logical plan pub async fn sql_to_df_plan( query_ctx: QueryContextRef, @@ -74,25 +255,13 @@ pub async fn sql_to_df_plan( Ok(plan) } -/// Find nearest lower bound for time `current` in given `plan` for the time window expr. -/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`, -/// return `Some("2021-07-01 00:00:00.000")` -/// if `plan` doesn't contain a `TIME INDEX` column, return `None` -/// -/// Time window expr is a expr that: -/// 1. ref only to a time index column -/// 2. is monotonic increasing -/// 3. show up in GROUP BY clause -/// -/// note this plan should only contain one TableScan -pub async fn find_plan_time_window_bound( +/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window) +async fn find_time_window_expr( plan: &LogicalPlan, - current: Timestamp, + catalog_man: CatalogManagerRef, query_ctx: QueryContextRef, - engine: QueryEngineRef, -) -> Result<(String, Option, Option), Error> { +) -> Result<(String, Option, TimeUnit, DFSchema), Error> { // TODO(discord9): find the expr that do time window - let catalog_man = engine.engine_state().catalog_manager(); let mut table_name = None; // first find the table source in the logical plan @@ -199,6 +368,31 @@ pub async fn find_plan_time_window_bound( .with_context(|_e| DatafusionSnafu { context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"), })?; + Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema)) +} + +/// Find nearest lower bound for time `current` in given `plan` for the time window expr. +/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`, +/// return `Some("2021-07-01 00:00:00.000")` +/// if `plan` doesn't contain a `TIME INDEX` column, return `None` +/// +/// Time window expr is a expr that: +/// 1. ref only to a time index column +/// 2. is monotonic increasing +/// 3. show up in GROUP BY clause +/// +/// note this plan should only contain one TableScan +pub async fn find_plan_time_window_bound( + plan: &LogicalPlan, + current: Timestamp, + query_ctx: QueryContextRef, + engine: QueryEngineRef, +) -> Result<(String, Option, Option), Error> { + // TODO(discord9): find the expr that do time window + let catalog_man = engine.engine_state().catalog_manager(); + + let (ts_col_name, time_window_expr, expected_time_unit, df_schema) = + find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?; // cast current to ts_index's type let new_current = current @@ -461,59 +655,14 @@ fn eval_ts_to_ts( context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"), })?; - let val = match eval_res { - datafusion_expr::ColumnarValue::Array(array) => { - let ty = array.data_type(); - let ty = ConcreteDataType::from_arrow_type(ty); - let time_unit = if let ConcreteDataType::Timestamp(ty) = ty { - ty.unit() - } else { - return UnexpectedSnafu { - reason: format!("Physical expression {phy:?} evaluated to non-timestamp type"), - } - .fail(); - }; - - match time_unit { - TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone()) - .with_context(|_| DatatypesSnafu { - extra: format!("Failed to create vector from arrow array {array:?}"), - })? - .get(0), - TimeUnit::Millisecond => { - TimestampMillisecondVector::try_from_arrow_array(array.clone()) - .with_context(|_| DatatypesSnafu { - extra: format!("Failed to create vector from arrow array {array:?}"), - })? - .get(0) - } - TimeUnit::Microsecond => { - TimestampMicrosecondVector::try_from_arrow_array(array.clone()) - .with_context(|_| DatatypesSnafu { - extra: format!("Failed to create vector from arrow array {array:?}"), - })? - .get(0) - } - TimeUnit::Nanosecond => { - TimestampNanosecondVector::try_from_arrow_array(array.clone()) - .with_context(|_| DatatypesSnafu { - extra: format!("Failed to create vector from arrow array {array:?}"), - })? - .get(0) - } - } - } - datafusion_expr::ColumnarValue::Scalar(scalar) => Value::try_from(scalar.clone()) - .with_context(|_| DatatypesSnafu { - extra: format!("Failed to convert scalar {scalar:?} to value"), - })?, - }; - - if let Value::Timestamp(ts) = val { - Ok(ts) + if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() { + Ok(*ts) } else { UnexpectedSnafu { - reason: format!("Expected timestamp in expression {phy:?} but got {val:?}"), + reason: format!( + "Expected timestamp in expression {phy:?} but got {:?}", + eval_res + ), } .fail()? } diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index e5a59862e9..48c7b9fe82 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -12,29 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use api::v1::flow::FlowResponse; +use common_error::ext::BoxedError; use common_meta::ddl::create_flow::FlowType; +use common_meta::key::flow::FlowMetadataManagerRef; +use common_meta::key::table_info::TableInfoManager; +use common_meta::key::TableMetadataManagerRef; use common_telemetry::tracing::warn; use common_telemetry::{debug, info}; use common_time::Timestamp; +use datafusion::sql::unparser::expr_to_sql; use datafusion_common::tree_node::TreeNode; use datatypes::value::Value; use query::QueryEngineRef; use session::context::QueryContextRef; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::RegionId; +use table::metadata::TableId; use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::{oneshot, RwLock}; use tokio::time::Instant; use super::frontend_client::FrontendClient; -use super::{df_plan_to_sql, AddFilterRewriter}; -use crate::adapter::{CreateFlowArgs, FlowId}; -use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu}; +use super::{df_plan_to_sql, AddFilterRewriter, TimeWindowExpr}; +use crate::adapter::{CreateFlowArgs, FlowId, TableName}; +use crate::error::{ + DatafusionSnafu, DatatypesSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, + TimeSnafu, UnexpectedSnafu, +}; use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY}; -use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan}; +use crate::recording_rules::{find_plan_time_window_bound, find_time_window_expr, sql_to_df_plan}; use crate::Error; /// TODO(discord9): make those constants configurable @@ -49,18 +60,74 @@ pub struct RecordingRuleEngine { tasks: RwLock>, shutdown_txs: RwLock>>, frontend_client: Arc, + flow_metadata_manager: FlowMetadataManagerRef, + table_meta: TableMetadataManagerRef, engine: QueryEngineRef, } impl RecordingRuleEngine { - pub fn new(frontend_client: Arc, engine: QueryEngineRef) -> Self { + pub fn new( + frontend_client: Arc, + engine: QueryEngineRef, + flow_metadata_manager: FlowMetadataManagerRef, + table_meta: TableMetadataManagerRef, + ) -> Self { Self { tasks: Default::default(), shutdown_txs: Default::default(), frontend_client, + flow_metadata_manager, + table_meta, engine, } } + + pub async fn handle_inserts( + &self, + request: api::v1::region::InsertRequests, + ) -> Result { + let table_info_mgr = self.table_meta.table_info_manager(); + let mut group_by_table_name: HashMap> = HashMap::new(); + for r in request.requests { + let tid = RegionId::from(r.region_id).table_id(); + let name = get_table_name(table_info_mgr, &tid).await?; + let entry = group_by_table_name.entry(name).or_default(); + if let Some(rows) = r.rows { + entry.push(rows); + } + } + + for (_flow_id, task) in self.tasks.read().await.iter() { + let src_table_names = &task.source_table_names; + + for src_table_name in src_table_names { + if let Some(entry) = group_by_table_name.get(src_table_name) { + let Some(expr) = &task.time_window_expr else { + continue; + }; + let involved_time_windows = expr.handle_rows(entry.clone()).await?; + let mut state = task.state.write().await; + state + .dirty_time_windows + .add_lower_bounds(involved_time_windows.into_iter()); + } + } + } + + Ok(Default::default()) + } +} + +async fn get_table_name(zelf: &TableInfoManager, table_id: &TableId) -> Result { + zelf.get(*table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .with_context(|| UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found table name", table_id), + }) + .map(|name| name.table_name()) + .map(|name| [name.catalog_name, name.schema_name, name.table_name]) } const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); @@ -70,7 +137,7 @@ impl RecordingRuleEngine { let CreateFlowArgs { flow_id, sink_table_name, - source_table_ids: _, + source_table_ids, create_if_not_exists, or_replace, expire_after, @@ -115,14 +182,46 @@ impl RecordingRuleEngine { } .fail()? }; + let query_ctx = Arc::new(query_ctx); + let mut source_table_names = Vec::new(); + for src_id in source_table_ids { + let table_name = self + .table_meta + .table_info_manager() + .get(src_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .with_context(|| UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found table name", src_id), + }) + .map(|name| name.table_name()) + .map(|name| [name.catalog_name, name.schema_name, name.table_name])?; + source_table_names.push(table_name); + } let (tx, rx) = oneshot::channel(); + + let plan = sql_to_df_plan(query_ctx.clone(), self.engine.clone(), &sql, true).await?; + let (column_name, time_window_expr, _, df_schema) = find_time_window_expr( + &plan, + self.engine.engine_state().catalog_manager().clone(), + query_ctx.clone(), + ) + .await?; + + let phy_expr = time_window_expr + .map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema)) + .transpose()?; + let task = RecordingRuleTask::new( flow_id, &sql, + phy_expr, expire_after, sink_table_name, - Arc::new(query_ctx), + source_table_names, + query_ctx, rx, ); @@ -171,26 +270,33 @@ impl RecordingRuleEngine { pub struct RecordingRuleTask { flow_id: FlowId, query: String, + time_window_expr: Option, /// in seconds expire_after: Option, sink_table_name: [String; 3], + source_table_names: HashSet<[String; 3]>, state: Arc>, } impl RecordingRuleTask { + #[allow(clippy::too_many_arguments)] pub fn new( flow_id: FlowId, query: &str, + time_window_expr: Option, expire_after: Option, sink_table_name: [String; 3], + source_table_names: Vec<[String; 3]>, query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>, ) -> Self { Self { flow_id, query: query.to_string(), + time_window_expr, expire_after, sink_table_name, + source_table_names: source_table_names.into_iter().collect(), state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))), } } @@ -207,17 +313,20 @@ impl RecordingRuleTask { loop { // FIXME(discord9): test if need upper bound also works - let new_query = self - .gen_query_with_time_window(engine.clone(), false) - .await?; + let new_query = self.gen_query_with_time_window(engine.clone()).await?; - let insert_into = format!( - "INSERT INTO {}.{}.{} {}", - self.sink_table_name[0], - self.sink_table_name[1], - self.sink_table_name[2], - new_query - ); + let insert_into = if let Some(new_query) = new_query { + format!( + "INSERT INTO {}.{}.{} {}", + self.sink_table_name[0], + self.sink_table_name[1], + self.sink_table_name[2], + new_query + ) + } else { + tokio::time::sleep(MIN_REFRESH_DURATION).await; + continue; + }; if is_first { is_first = false; @@ -284,11 +393,11 @@ impl RecordingRuleTask { } } + /// will merge and use the first ten time window in query async fn gen_query_with_time_window( &self, engine: QueryEngineRef, - need_upper_bound: bool, - ) -> Result { + ) -> Result, Error> { let query_ctx = self.state.read().await.query_ctx.clone(); let start = SystemTime::now(); let since_the_epoch = start @@ -299,48 +408,55 @@ impl RecordingRuleTask { .map(|e| since_the_epoch.as_secs() - e as u64); let Some(low_bound) = low_bound else { - return Ok(self.query.clone()); + return Ok(Some(self.query.clone())); }; let low_bound = Timestamp::new_second(low_bound as i64); let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?; + // TODO(discord9): find more time window! let (col_name, lower, upper) = find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone()) .await?; let new_sql = { - let to_df_literal = |value| -> Result<_, Error> { - let value = Value::from(value); - let value = value - .try_to_scalar_value(&value.data_type()) - .with_context(|_| DatatypesSnafu { - extra: format!("Failed to convert to scalar value: {}", value), - })?; - Ok(value) - }; - let lower = lower.map(to_df_literal).transpose()?; - let upper = upper.map(to_df_literal).transpose()?.and_then(|u| { - if need_upper_bound { - Some(u) - } else { - None - } - }); let expr = { - use datafusion_expr::{col, lit}; match (lower, upper) { - (Some(l), Some(u)) => col(&col_name) - .gt_eq(lit(l)) - .and(col(&col_name).lt_eq(lit(u))), - (Some(l), None) => col(&col_name).gt_eq(lit(l)), - (None, Some(u)) => col(&col_name).lt(lit(u)), - // no time window, direct return - (None, None) => return Ok(self.query.clone()), + (Some(l), Some(u)) => { + let window_size = u.sub(&l).with_context(|| UnexpectedSnafu { + reason: format!("Can't get window size from {u:?} - {l:?}"), + })?; + + self.state + .write() + .await + .dirty_time_windows + .gen_filter_exprs(&col_name, lower, window_size)? + } + _ => UnexpectedSnafu { + reason: format!("Can't get window size: lower={lower:?}, upper={upper:?}"), + } + .fail()?, } }; + debug!( + "Flow id={:?}, Generated filter expr: {:?}", + self.flow_id, + expr.as_ref() + .map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu { + context: format!("Failed to generate filter expr from {expr:?}"), + })) + .transpose()? + .map(|s| s.to_string()) + ); + + let Some(expr) = expr else { + // no new data, hence no need to update + return Ok(None); + }; + let mut add_filter = AddFilterRewriter::new(expr); // make a not optimized plan for clearer unparse let plan = @@ -355,7 +471,7 @@ impl RecordingRuleTask { df_plan_to_sql(&plan)? }; - Ok(new_sql) + Ok(Some(new_sql)) } } @@ -366,16 +482,150 @@ pub struct RecordingRuleState { last_update_time: Instant, /// last time query duration last_query_duration: Duration, + /// Dirty Time windows need to be updated + /// mapping of `start -> end` and non-overlapping + dirty_time_windows: DirtyTimeWindows, exec_state: ExecState, shutdown_rx: oneshot::Receiver<()>, } +#[derive(Debug, Clone, Default)] +pub struct DirtyTimeWindows { + windows: BTreeMap>, +} + +fn to_df_literal(value: Timestamp) -> Result { + let value = Value::from(value); + let value = value + .try_to_scalar_value(&value.data_type()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to convert to scalar value: {}", value), + })?; + Ok(value) +} + +impl DirtyTimeWindows { + /// Time window merge distance + const MERGE_DIST: i32 = 3; + /// Add lower bounds to the dirty time windows. Upper bounds are ignored. + /// + /// # Arguments + /// + /// * `lower_bounds` - An iterator of lower bounds to be added. + pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator) { + for lower_bound in lower_bounds { + let entry = self.windows.entry(lower_bound); + entry.or_insert(None); + } + } + + /// Generate all filter expressions consuming all time windows + pub fn gen_filter_exprs( + &mut self, + col_name: &str, + expire_lower_bound: Option, + window_size: chrono::Duration, + ) -> Result, Error> { + debug!( + "expire_lower_bound: {:?}, window_size: {:?}", + expire_lower_bound.map(|t| t.to_iso8601_string()), + window_size + ); + self.merge_dirty_time_windows(window_size)?; + let mut expr_lst = vec![]; + for (start, end) in std::mem::take(&mut self.windows).into_iter() { + debug!( + "Time window start: {:?}, end: {:?}", + start.to_iso8601_string(), + end.map(|t| t.to_iso8601_string()) + ); + let start = if let Some(expire) = expire_lower_bound { + start.max(expire) + } else { + start + }; + + // filter out expired time window + if let Some(end) = end { + if end <= start { + continue; + } + } + + use datafusion_expr::{col, lit}; + let lower = to_df_literal(start)?; + let upper = end.map(to_df_literal).transpose()?; + let expr = if let Some(upper) = upper { + col(col_name) + .gt_eq(lit(lower)) + .and(col(col_name).lt(lit(upper))) + } else { + col(col_name).gt_eq(lit(lower)) + }; + expr_lst.push(expr); + } + let expr = expr_lst.into_iter().reduce(|a, b| a.and(b)); + Ok(expr) + } + + /// Merge time windows that overlaps or get too close + pub fn merge_dirty_time_windows(&mut self, window_size: chrono::Duration) -> Result<(), Error> { + let mut new_windows = BTreeMap::new(); + + let mut prev_tw = None; + for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) { + let Some(prev_tw) = &mut prev_tw else { + prev_tw = Some((lower_bound, upper_bound)); + continue; + }; + + let std_window_size = window_size.to_std().map_err(|e| { + InternalSnafu { + reason: e.to_string(), + } + .build() + })?; + + // if cur.lower - prev.upper <= window_size * 2, merge + let prev_upper = prev_tw + .1 + .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?); + + let cur_upper = upper_bound.unwrap_or( + lower_bound + .add_duration(std_window_size) + .context(TimeSnafu)?, + ); + + if lower_bound + .sub(&prev_upper) + .map(|dist| dist <= window_size * Self::MERGE_DIST) + .unwrap_or(false) + { + prev_tw.1 = Some(cur_upper); + } else { + new_windows.insert(prev_tw.0, prev_tw.1); + *prev_tw = (lower_bound, Some(cur_upper)); + } + } + + if let Some(prev_tw) = prev_tw { + new_windows.insert(prev_tw.0, prev_tw.1); + } + + self.windows = new_windows; + + Ok(()) + } +} + impl RecordingRuleState { pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { Self { query_ctx, last_update_time: Instant::now(), last_query_duration: Duration::from_secs(0), + dirty_time_windows: Default::default(), exec_state: ExecState::Idle, shutdown_rx, } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index dcdc5643bd..005ddab475 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -453,8 +453,12 @@ impl FlownodeBuilder { let node_id = self.opts.node_id.map(|id| id as u32); - let rule_engine = - RecordingRuleEngine::new(self.frontend_client.clone(), query_engine.clone()); + let rule_engine = RecordingRuleEngine::new( + self.frontend_client.clone(), + query_engine.clone(), + self.flow_metadata_manager.clone(), + table_meta.clone(), + ); let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine); for worker_id in 0..num_workers { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 078cbe9c9f..49648d991d 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -336,11 +336,11 @@ impl Inserter { let InstantAndNormalInsertRequests { normal_requests, - instant_requests: _, + instant_requests, } = requests; // TODO(discord9): mirror some - /* + // Mirror requests for source table to flownode asynchronously let flow_mirror_task = FlowMirrorTask::new( &self.table_flownode_set_cache, @@ -350,7 +350,7 @@ impl Inserter { .chain(instant_requests.requests.iter()), ) .await?; - flow_mirror_task.detach(self.node_manager.clone())?;*/ + flow_mirror_task.detach(self.node_manager.clone())?; // Write requests to datanode and wait for response let write_tasks = self