diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 3456a17489..125c185a5a 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -16,7 +16,6 @@ mod client; pub mod client_manager; -#[cfg(feature = "testing")] mod database; pub mod error; pub mod flow; @@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use snafu::OptionExt; pub use self::client::Client; -#[cfg(feature = "testing")] pub use self::database::Database; pub use self::error::{Error, Result}; use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu}; diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 9280202471..94a7d6b54b 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager; use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; -use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker}; +use flow::{FlownodeBuilder, FlownodeInstance, FrontendClient, FrontendInvoker}; use meta_client::{MetaClientOptions, MetaClientType}; use servers::Mode; use snafu::{OptionExt, ResultExt}; @@ -317,6 +317,8 @@ impl StartCommand { Arc::new(executor), ); + let frontend_client = FrontendClient::from_meta_client(meta_client.clone()); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone())); let flownode_builder = FlownodeBuilder::new( opts, @@ -324,6 +326,7 @@ impl StartCommand { table_metadata_manager, catalog_manager.clone(), flow_metadata_manager, + Arc::new(frontend_client), ) .with_heartbeat_task(heartbeat_task); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8faca3e78b..3acc150e18 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -54,7 +54,10 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::region_server::RegionServer; use file_engine::config::EngineConfig as FileEngineConfig; -use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker}; +use flow::{ + FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendClient, + FrontendInvoker, +}; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; @@ -533,12 +536,16 @@ impl StartCommand { flow: opts.flow.clone(), ..Default::default() }; + + let fe_server_addr = fe_opts.grpc.bind_addr.clone(); + let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr); let flow_builder = FlownodeBuilder::new( flownode_options, plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), flow_metadata_manager.clone(), + Arc::new(frontend_client), ); let flownode = Arc::new( flow_builder diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index db8a700059..0ce61b6fa0 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -343,6 +343,7 @@ pub enum FlowType { impl FlowType { pub const RECORDING_RULE: &str = "recording_rule"; pub const STREAMING: &str = "streaming"; + pub const FLOW_TYPE_KEY: &str = "flow_type"; } impl Default for FlowType { @@ -398,7 +399,8 @@ impl From<&CreateFlowData> for CreateRequest { }; let flow_type = value.flow_type.unwrap_or_default().to_string(); - req.flow_options.insert("flow_type".to_string(), flow_type); + req.flow_options + .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type); req } } @@ -430,7 +432,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa .collect::>(); let flow_type = value.flow_type.unwrap_or_default().to_string(); - options.insert("flow_type".to_string(), flow_type); + options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type); let flow_info = FlowInfoValue { source_table_ids: value.source_table_ids.clone(), diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 47557752f9..7d23c445f9 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -49,12 +49,13 @@ pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::refill::RefillTask; use crate::adapter::table_source::ManagedTableSource; use crate::adapter::util::relation_desc_to_column_schemas_with_fallback; -pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; +pub(crate) use crate::adapter::worker::{create_worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu}; use crate::expr::Batch; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; +use crate::recording_rules::RecordingRuleEngine; use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; mod flownode_impl; @@ -171,6 +172,8 @@ pub struct FlowWorkerManager { flush_lock: RwLock<()>, /// receive a oneshot sender to send state size report state_report_handler: RwLock>, + /// engine for recording rule + rule_engine: RecordingRuleEngine, } /// Building FlownodeManager @@ -185,6 +188,7 @@ impl FlowWorkerManager { node_id: Option, query_engine: Arc, table_meta: TableMetadataManagerRef, + rule_engine: RecordingRuleEngine, ) -> Self { let srv_map = ManagedTableSource::new( table_meta.table_info_manager().clone(), @@ -207,6 +211,7 @@ impl FlowWorkerManager { node_id, flush_lock: RwLock::new(()), state_report_handler: RwLock::new(None), + rule_engine, } } @@ -215,25 +220,6 @@ impl FlowWorkerManager { self } - /// Create a flownode manager with one worker - pub fn new_with_workers<'s>( - node_id: Option, - query_engine: Arc, - table_meta: TableMetadataManagerRef, - num_workers: usize, - ) -> (Self, Vec>) { - let mut zelf = Self::new(node_id, query_engine, table_meta); - - let workers: Vec<_> = (0..num_workers) - .map(|_| { - let (handle, worker) = create_worker(); - zelf.add_worker_handle(handle); - worker - }) - .collect(); - (zelf, workers) - } - /// add a worker handler to manager, meaning this corresponding worker is under it's manage pub fn add_worker_handle(&mut self, handle: WorkerHandle) { self.worker_handles.push(handle); @@ -751,7 +737,11 @@ pub struct CreateFlowArgs { /// Create&Remove flow impl FlowWorkerManager { /// remove a flow by it's id + #[allow(unreachable_code)] pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + // TODO(discord9): reroute some back to streaming engine later + return self.rule_engine.remove_flow(flow_id).await; + for handle in self.worker_handles.iter() { if handle.contains_flow(flow_id).await? { handle.remove_flow(flow_id).await?; @@ -767,8 +757,10 @@ impl FlowWorkerManager { /// steps to create task: /// 1. parse query into typed plan(and optional parse expire_after expr) /// 2. render source/sink with output table id and used input table id - #[allow(clippy::too_many_arguments)] + #[allow(clippy::too_many_arguments, unreachable_code)] pub async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + // TODO(discord9): reroute some back to streaming engine later + return self.rule_engine.create_flow(args).await; let CreateFlowArgs { flow_id, sink_table_name, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 7f4a4267cb..1312b5e175 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -153,7 +153,10 @@ impl Flownode for FlowWorkerManager { } } + #[allow(unreachable_code, unused)] async fn handle_inserts(&self, request: InsertRequests) -> Result { + return Ok(Default::default()); + // using try_read to ensure two things: // 1. flush wouldn't happen until inserts before it is inserted // 2. inserts happening concurrently with flush wouldn't be block by flush diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 703b47641c..ec1c200f54 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -16,6 +16,7 @@ use std::any::Any; +use arrow_schema::ArrowError; use common_error::ext::BoxedError; use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; @@ -156,6 +157,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Arrow error: {raw:?} in context: {context}"))] + Arrow { + #[snafu(source)] + raw: ArrowError, + context: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Datafusion error: {raw:?} in context: {context}"))] Datafusion { #[snafu(source)] @@ -230,6 +240,7 @@ impl ErrorExt for Error { match self { Self::Eval { .. } | Self::JoinTask { .. } + | Self::Arrow { .. } | Self::Datafusion { .. } | Self::InsertIntoFlow { .. } => StatusCode::Internal, Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists, diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs index 636d83b72d..5a7c41a8c8 100644 --- a/src/flow/src/expr/utils.rs +++ b/src/flow/src/expr/utils.rs @@ -238,6 +238,7 @@ mod test { for (sql, current, expected) in &testcases { let plan = sql_to_substrait(engine.clone(), sql).await; + let mut ctx = create_test_ctx(); let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) .await diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index a186e57d89..0987768c9a 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -33,6 +33,7 @@ mod expr; pub mod heartbeat; mod metrics; mod plan; +mod recording_rules; mod repr; mod server; mod transform; @@ -43,4 +44,5 @@ mod test_utils; pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use error::{Error, Result}; +pub use recording_rules::FrontendClient; pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker}; diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index f165bcadc6..b20cc9acb4 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -28,6 +28,12 @@ lazy_static! { &["table_id"] ) .unwrap(); + pub static ref METRIC_FLOW_RULE_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!( + "greptime_flow_rule_engine_query_time", + "flow rule engine query time", + &["flow_id"] + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( diff --git a/src/flow/src/recording_rules.rs b/src/flow/src/recording_rules.rs new file mode 100644 index 0000000000..943f9b80d0 --- /dev/null +++ b/src/flow/src/recording_rules.rs @@ -0,0 +1,719 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Run flow as recording rule which is time-window-aware normal query triggered every tick set by user + +mod engine; +mod frontend_client; + +use std::collections::HashSet; +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_recordbatch::DfRecordBatch; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +use datafusion::error::Result as DfResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use datafusion::prelude::SessionContext; +use datafusion::sql::unparser::Unparser; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter}; +use datafusion_common::{Column, DFSchema, TableReference}; +use datafusion_expr::LogicalPlan; +use datafusion_physical_expr::PhysicalExprRef; +use datatypes::prelude::{ConcreteDataType, DataType}; +use datatypes::value::Value; +use datatypes::vectors::{ + TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, + TimestampSecondVector, Vector, +}; +pub use engine::RecordingRuleEngine; +pub use frontend_client::FrontendClient; +use query::parser::QueryLanguageParser; +use query::QueryEngineRef; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::df_optimizer::apply_df_optimizer; +use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu}; +use crate::Error; + +/// Convert sql to datafusion logical plan +pub async fn sql_to_df_plan( + query_ctx: QueryContextRef, + engine: QueryEngineRef, + sql: &str, + optimize: bool, +) -> Result { + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let plan = engine + .planner() + .plan(&stmt, query_ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let plan = if optimize { + apply_df_optimizer(plan).await? + } else { + plan + }; + Ok(plan) +} + +/// Find nearest lower bound for time `current` in given `plan` for the time window expr. +/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`, +/// return `Some("2021-07-01 00:00:00.000")` +/// if `plan` doesn't contain a `TIME INDEX` column, return `None` +/// +/// Time window expr is a expr that: +/// 1. ref only to a time index column +/// 2. is monotonic increasing +/// 3. show up in GROUP BY clause +/// +/// note this plan should only contain one TableScan +pub async fn find_plan_time_window_bound( + plan: &LogicalPlan, + current: Timestamp, + query_ctx: QueryContextRef, + engine: QueryEngineRef, +) -> Result<(String, Option, Option), Error> { + // TODO(discord9): find the expr that do time window + let catalog_man = engine.engine_state().catalog_manager(); + + let mut table_name = None; + // first find the table source in the logical plan + plan.apply(|plan| { + let LogicalPlan::TableScan(table_scan) = plan else { + return Ok(TreeNodeRecursion::Continue); + }; + table_name = Some(table_scan.table_name.clone()); + Ok(TreeNodeRecursion::Stop) + }) + .with_context(|_| DatafusionSnafu { + context: format!("Can't find table source in plan {plan:?}"), + })?; + let Some(table_name) = table_name else { + UnexpectedSnafu { + reason: format!("Can't find table source in plan {plan:?}"), + } + .fail()? + }; + + let current_schema = query_ctx.current_schema(); + + let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog()); + let schema_name = table_name.schema().unwrap_or(¤t_schema); + let table_name = table_name.table(); + + let Some(table_ref) = catalog_man + .table(catalog_name, schema_name, table_name, Some(&query_ctx)) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + else { + UnexpectedSnafu { + reason: format!( + "Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}" + ), + } + .fail()? + }; + + let schema = &table_ref.table_info().meta.schema; + + let ts_index = schema.timestamp_column().context(UnexpectedSnafu { + reason: format!("Can't find timestamp column in table {table_name:?}"), + })?; + + let ts_col_name = ts_index.name.clone(); + + let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu { + reason: format!( + "Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}" + ), + })?.unit(); + + let ts_columns: HashSet<_> = HashSet::from_iter(vec![ + format!("{catalog_name}.{schema_name}.{table_name}.{ts_col_name}"), + format!("{schema_name}.{table_name}.{ts_col_name}"), + format!("{table_name}.{ts_col_name}"), + format!("{ts_col_name}"), + ]); + let ts_columns: HashSet<_> = ts_columns + .into_iter() + .map(Column::from_qualified_name) + .collect(); + + let ts_columns_ref: HashSet<&Column> = ts_columns.iter().collect(); + + // find the time window expr which refers to the time index column + let mut time_window_expr: Option = None; + let find_time_window_expr = |plan: &LogicalPlan| { + let LogicalPlan::Aggregate(aggregate) = plan else { + return Ok(TreeNodeRecursion::Continue); + }; + + for group_expr in &aggregate.group_expr { + let refs = group_expr.column_refs(); + if refs.len() != 1 { + continue; + } + let ref_col = refs.iter().next().unwrap(); + if ts_columns_ref.contains(ref_col) { + time_window_expr = Some(group_expr.clone()); + break; + } + } + + Ok(TreeNodeRecursion::Stop) + }; + plan.apply(find_time_window_expr) + .with_context(|_| DatafusionSnafu { + context: format!("Can't find time window expr in plan {plan:?}"), + })?; + + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + ts_col_name.clone(), + ts_index.data_type.as_arrow_type(), + false, + )])); + + let df_schema = DFSchema::from_field_specific_qualified_schema( + vec![Some(TableReference::bare(table_name))], + &arrow_schema, + ) + .with_context(|_e| DatafusionSnafu { + context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"), + })?; + + // cast current to ts_index's type + let new_current = current + .convert_to(expected_time_unit) + .with_context(|| UnexpectedSnafu { + reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"), + })?; + + // if no time_window_expr is found, return None + if let Some(time_window_expr) = time_window_expr { + let lower_bound = + find_expr_time_window_lower_bound(&time_window_expr, &df_schema, new_current)?; + let upper_bound = + find_expr_time_window_upper_bound(&time_window_expr, &df_schema, new_current)?; + Ok((ts_col_name, lower_bound, upper_bound)) + } else { + Ok((ts_col_name, None, None)) + } +} + +/// Find the lower bound of time window in given `expr` and `current` timestamp. +/// +/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`, +/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound +/// of current time window given the current timestamp +/// +/// if return None, meaning this time window have no lower bound +fn find_expr_time_window_lower_bound( + expr: &Expr, + df_schema: &DFSchema, + current: Timestamp, +) -> Result, Error> { + use std::cmp::Ordering; + + let phy_planner = DefaultPhysicalPlanner::default(); + + let phy_expr: PhysicalExprRef = phy_planner + .create_physical_expr(expr, df_schema, &SessionContext::new().state()) + .with_context(|_e| DatafusionSnafu { + context: format!( + "Failed to create physical expression from {expr:?} using {df_schema:?}" + ), + })?; + + let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?; + if cur_time_window == current { + return Ok(Some(current)); + } + + // search to find the lower bound + let mut offset: i64 = 1; + let lower_bound; + let mut upper_bound = Some(current); + // first expontial probe to found a range for binary search + loop { + let Some(next_val) = current.value().checked_sub(offset) else { + // no lower bound + return Ok(None); + }; + + let prev_time_probe = common_time::Timestamp::new(next_val, current.unit()); + + let prev_time_window = eval_ts_to_ts(&phy_expr, df_schema, prev_time_probe)?; + + match prev_time_window.cmp(&cur_time_window) { + Ordering::Less => { + lower_bound = Some(prev_time_probe); + break; + } + Ordering::Equal => { + upper_bound = Some(prev_time_probe); + } + Ordering::Greater => { + UnexpectedSnafu { + reason: format!( + "Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}" + ), + } + .fail()? + } + } + + let Some(new_offset) = offset.checked_mul(2) else { + // no lower bound + return Ok(None); + }; + offset = new_offset; + } + + // binary search for the exact lower bound + + ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{ + reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"), + }); + + let output_unit = lower_bound + .context(UnexpectedSnafu { + reason: "should have lower bound", + })? + .unit(); + + let mut low = lower_bound + .context(UnexpectedSnafu { + reason: "should have lower bound", + })? + .value(); + let mut high = upper_bound + .context(UnexpectedSnafu { + reason: "should have upper bound", + })? + .value(); + while low < high { + let mid = (low + high) / 2; + let mid_probe = common_time::Timestamp::new(mid, output_unit); + let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?; + + match mid_time_window.cmp(&cur_time_window) { + Ordering::Less => low = mid + 1, + Ordering::Equal => high = mid, + Ordering::Greater => UnexpectedSnafu { + reason: format!("Binary search failed for time window expression {expr:?}"), + } + .fail()?, + } + } + + let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit); + + Ok(Some(final_lower_bound_for_time_window)) +} + +/// Find the upper bound for time window expression +fn find_expr_time_window_upper_bound( + expr: &Expr, + df_schema: &DFSchema, + current: Timestamp, +) -> Result, Error> { + use std::cmp::Ordering; + + let phy_planner = DefaultPhysicalPlanner::default(); + + let phy_expr: PhysicalExprRef = phy_planner + .create_physical_expr(expr, df_schema, &SessionContext::new().state()) + .with_context(|_e| DatafusionSnafu { + context: format!( + "Failed to create physical expression from {expr:?} using {df_schema:?}" + ), + })?; + + let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?; + + // search to find the lower bound + let mut offset: i64 = 1; + let mut lower_bound = Some(current); + let upper_bound; + // first expontial probe to found a range for binary search + loop { + let Some(next_val) = current.value().checked_add(offset) else { + // no upper bound if overflow + return Ok(None); + }; + + let next_time_probe = common_time::Timestamp::new(next_val, current.unit()); + + let next_time_window = eval_ts_to_ts(&phy_expr, df_schema, next_time_probe)?; + + match next_time_window.cmp(&cur_time_window) { + Ordering::Less => {UnexpectedSnafu { + reason: format!( + "Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}" + ), + } + .fail()? + } + Ordering::Equal => { + lower_bound = Some(next_time_probe); + } + Ordering::Greater => { + upper_bound = Some(next_time_probe); + break + } + } + + let Some(new_offset) = offset.checked_mul(2) else { + // no upper bound if overflow + return Ok(None); + }; + offset = new_offset; + } + + // binary search for the exact upper bound + + ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{ + reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"), + }); + + let output_unit = upper_bound + .context(UnexpectedSnafu { + reason: "should have lower bound", + })? + .unit(); + + let mut low = lower_bound + .context(UnexpectedSnafu { + reason: "should have lower bound", + })? + .value(); + let mut high = upper_bound + .context(UnexpectedSnafu { + reason: "should have upper bound", + })? + .value(); + while low < high { + let mid = (low + high) / 2; + let mid_probe = common_time::Timestamp::new(mid, output_unit); + let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?; + + match mid_time_window.cmp(&cur_time_window) { + Ordering::Less => UnexpectedSnafu { + reason: format!("Binary search failed for time window expression {expr:?}"), + } + .fail()?, + Ordering::Equal => low = mid + 1, + Ordering::Greater => high = mid, + } + } + + let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit); + + Ok(Some(final_upper_bound_for_time_window)) +} + +fn eval_ts_to_ts( + phy: &PhysicalExprRef, + df_schema: &DFSchema, + value: Timestamp, +) -> Result { + let ts_vector = match value.unit() { + TimeUnit::Second => TimestampSecondVector::from_vec(vec![value.value()]).to_arrow_array(), + TimeUnit::Millisecond => { + TimestampMillisecondVector::from_vec(vec![value.value()]).to_arrow_array() + } + TimeUnit::Microsecond => { + TimestampMicrosecondVector::from_vec(vec![value.value()]).to_arrow_array() + } + TimeUnit::Nanosecond => { + TimestampNanosecondVector::from_vec(vec![value.value()]).to_arrow_array() + } + }; + + let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()]) + .with_context(|_| ArrowSnafu { + context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"), + })?; + + let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu { + context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"), + })?; + + let val = match eval_res { + datafusion_expr::ColumnarValue::Array(array) => { + let ty = array.data_type(); + let ty = ConcreteDataType::from_arrow_type(ty); + let time_unit = if let ConcreteDataType::Timestamp(ty) = ty { + ty.unit() + } else { + return UnexpectedSnafu { + reason: format!("Physical expression {phy:?} evaluated to non-timestamp type"), + } + .fail(); + }; + + match time_unit { + TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .get(0), + TimeUnit::Millisecond => { + TimestampMillisecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .get(0) + } + TimeUnit::Microsecond => { + TimestampMicrosecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .get(0) + } + TimeUnit::Nanosecond => { + TimestampNanosecondVector::try_from_arrow_array(array.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to create vector from arrow array {array:?}"), + })? + .get(0) + } + } + } + datafusion_expr::ColumnarValue::Scalar(scalar) => Value::try_from(scalar.clone()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to convert scalar {scalar:?} to value"), + })?, + }; + + if let Value::Timestamp(ts) = val { + Ok(ts) + } else { + UnexpectedSnafu { + reason: format!("Expected timestamp in expression {phy:?} but got {val:?}"), + } + .fail()? + } +} + +// TODO(discord9): a method to found out the precise time window + +/// Find out the `Filter` Node corresponding to outermost `WHERE` and add a new filter expr to it +#[derive(Debug)] +pub struct AddFilterRewriter { + extra_filter: Expr, + is_rewritten: bool, +} + +impl AddFilterRewriter { + fn new(filter: Expr) -> Self { + Self { + extra_filter: filter, + is_rewritten: false, + } + } +} + +impl TreeNodeRewriter for AddFilterRewriter { + type Node = LogicalPlan; + fn f_down(&mut self, node: Self::Node) -> DfResult> { + if self.is_rewritten { + return Ok(Transformed::no(node)); + } + match node { + LogicalPlan::Filter(mut filter) if !filter.having => { + filter.predicate = filter.predicate.and(self.extra_filter.clone()); + self.is_rewritten = true; + Ok(Transformed::yes(LogicalPlan::Filter(filter))) + } + LogicalPlan::TableScan(_) => { + // add a new filter + let filter = + datafusion_expr::Filter::try_new(self.extra_filter.clone(), Arc::new(node))?; + self.is_rewritten = true; + Ok(Transformed::yes(LogicalPlan::Filter(filter))) + } + _ => Ok(Transformed::no(node)), + } + } +} + +fn df_plan_to_sql(plan: &LogicalPlan) -> Result { + let unparser = Unparser::default(); + let sql = unparser + .plan_to_sql(plan) + .with_context(|_e| DatafusionSnafu { + context: format!("Failed to unparse logical plan {plan:?}"), + })?; + Ok(sql.to_string()) +} + +#[cfg(test)] +mod test { + use datafusion_common::tree_node::TreeNode; + use pretty_assertions::assert_eq; + use session::context::QueryContext; + + use super::{sql_to_df_plan, *}; + use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter}; + use crate::test_utils::create_test_query_engine; + + #[tokio::test] + async fn test_add_filter() { + let testcases = vec![ + ( + "SELECT number FROM numbers_with_ts GROUP BY number","SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (number > 4) GROUP BY numbers_with_ts.number" + ), + ( + "SELECT number FROM numbers_with_ts WHERE number < 2 OR number >10", + "SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4))" + ), + ( + "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window", + "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE (number > 4) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)" + ) + ]; + use datafusion_expr::{col, lit}; + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + + for (before, after) in testcases { + let sql = before; + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false) + .await + .unwrap(); + + let mut add_filter = AddFilterRewriter::new(col("number").gt(lit(4u32))); + let plan = plan.rewrite(&mut add_filter).unwrap().data; + let new_sql = df_plan_to_sql(&plan).unwrap(); + assert_eq!(after, new_sql); + } + } + + #[tokio::test] + async fn test_plan_time_window_lower_bound() { + use datafusion_expr::{col, lit}; + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + + let testcases = [ + // no time index + ( + "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;", + Timestamp::new(23, TimeUnit::Millisecond), + ("ts".to_string(), None, None), + "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;" + ), + // time index + ( + "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + Timestamp::new(23, TimeUnit::Nanosecond), + ( + "ts".to_string(), + Some(Timestamp::new(0, TimeUnit::Millisecond)), + Some(Timestamp::new(300000, TimeUnit::Millisecond)), + ), + "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)" + ), + // on spot + ( + "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + Timestamp::new(0, TimeUnit::Nanosecond), + ( + "ts".to_string(), + Some(Timestamp::new(0, TimeUnit::Millisecond)), + Some(Timestamp::new(300000, TimeUnit::Millisecond)), + ), + "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)" + ), + // different time unit + ( + "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + Timestamp::new(23_000_000, TimeUnit::Nanosecond), + ( + "ts".to_string(), + Some(Timestamp::new(0, TimeUnit::Millisecond)), + Some(Timestamp::new(300000, TimeUnit::Millisecond)), + ), + "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)" + ), + // time index with other fields + ( + "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + Timestamp::new(23, TimeUnit::Millisecond), + ( + "ts".to_string(), + Some(Timestamp::new(0, TimeUnit::Millisecond)), + Some(Timestamp::new(300000, TimeUnit::Millisecond)), + ), + "SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)" + ), + // time index with other pks + ( + "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;", + Timestamp::new(23, TimeUnit::Millisecond), + ( + "ts".to_string(), + Some(Timestamp::new(0, TimeUnit::Millisecond)), + Some(Timestamp::new(300000, TimeUnit::Millisecond)), + ), + "SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number" + ), + ]; + + for (sql, current, expected, unparsed) in testcases { + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true) + .await + .unwrap(); + + let real = + find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone()) + .await + .unwrap(); + assert_eq!(real, expected); + + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false) + .await + .unwrap(); + let (col_name, lower, upper) = real; + let new_sql = if lower.is_some() { + let to_df_literal = |value| { + let value = Value::from(value); + + value.try_to_scalar_value(&value.data_type()).unwrap() + }; + let lower = to_df_literal(lower.unwrap()); + let upper = to_df_literal(upper.unwrap()); + let expr = col(&col_name) + .gt_eq(lit(lower)) + .and(col(&col_name).lt_eq(lit(upper))); + let mut add_filter = AddFilterRewriter::new(expr); + let plan = plan.rewrite(&mut add_filter).unwrap().data; + df_plan_to_sql(&plan).unwrap() + } else { + sql.to_string() + }; + assert_eq!(unparsed, new_sql); + } + } +} diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs new file mode 100644 index 0000000000..61ad064021 --- /dev/null +++ b/src/flow/src/recording_rules/engine.rs @@ -0,0 +1,387 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use common_meta::ddl::create_flow::FlowType; +use common_telemetry::info; +use common_telemetry::tracing::warn; +use common_time::Timestamp; +use datafusion_common::tree_node::TreeNode; +use datatypes::value::Value; +use query::QueryEngineRef; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use tokio::sync::oneshot::error::TryRecvError; +use tokio::sync::{oneshot, RwLock}; +use tokio::time::Instant; + +use super::frontend_client::FrontendClient; +use super::{df_plan_to_sql, AddFilterRewriter}; +use crate::adapter::{CreateFlowArgs, FlowId}; +use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowNotFoundSnafu, UnexpectedSnafu}; +use crate::metrics::METRIC_FLOW_RULE_ENGINE_QUERY_TIME; +use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan}; +use crate::Error; + +/// TODO(discord9): determine how to configure refresh rate +pub struct RecordingRuleEngine { + tasks: RwLock>, + shutdown_txs: RwLock>>, + frontend_client: Arc, + engine: QueryEngineRef, +} + +impl RecordingRuleEngine { + pub fn new(frontend_client: Arc, engine: QueryEngineRef) -> Self { + Self { + tasks: Default::default(), + shutdown_txs: Default::default(), + frontend_client, + engine, + } + } +} + +const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); + +impl RecordingRuleEngine { + pub async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + let CreateFlowArgs { + flow_id, + sink_table_name, + source_table_ids: _, + create_if_not_exists: _, + or_replace: _, + expire_after, + comment: _, + sql, + flow_options, + query_ctx, + } = args; + + let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY); + + ensure!( + flow_type == Some(&FlowType::RecordingRule.to_string()) || flow_type.is_none(), + UnexpectedSnafu { + reason: format!("Flow type is not RecordingRule nor None, got {flow_type:?}") + } + ); + + let Some(query_ctx) = query_ctx else { + UnexpectedSnafu { + reason: "Query context is None".to_string(), + } + .fail()? + }; + + let (tx, rx) = oneshot::channel(); + let task = RecordingRuleTask::new( + flow_id, + &sql, + expire_after, + sink_table_name, + Arc::new(query_ctx), + rx, + ); + + let task_inner = task.clone(); + let engine = self.engine.clone(); + let frontend = self.frontend_client.clone(); + + // TODO(discord9): also save handle & use time wheel or what for better + let _handle = common_runtime::spawn_global(async move { + match task_inner.start_executing(engine, frontend).await { + Ok(()) => info!("Flow {} shutdown", task_inner.flow_id), + Err(err) => common_telemetry::error!( + "Flow {} encounter unrecoverable error: {err:?}", + task_inner.flow_id + ), + } + }); + + // TODO(discord9): deal with replace logic + let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task); + drop(replaced_old_task_opt); + + self.shutdown_txs.write().await.insert(flow_id, tx); + + Ok(Some(flow_id)) + } + + pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + if self.tasks.write().await.remove(&flow_id).is_none() { + warn!("Flow {flow_id} not found in tasks") + } + let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else { + UnexpectedSnafu { + reason: format!("Can't found shutdown tx for flow {flow_id}"), + } + .fail()? + }; + if tx.send(()).is_err() { + warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?") + } + Ok(()) + } + + async fn gen_and_exec_query_for_flow( + &self, + flow_id: FlowId, + need_upper_bound: bool, + ) -> Result<(), Error> { + let (new_query, state_ref) = { + let tasks = self.tasks.read().await; + let task = tasks + .get(&flow_id) + .context(FlowNotFoundSnafu { id: flow_id })?; + let new_query = task + .gen_query_with_time_window(self.engine.clone(), need_upper_bound) + .await?; + task.state.write().await.exec_state = ExecState::Executing; + (new_query, task.state.clone()) + }; + + let instant = Instant::now(); + info!("Executing flow {flow_id} with query {new_query}"); + let res = self.frontend_client.sql(&new_query).await; + let elapsed = instant.elapsed(); + info!( + "Flow {flow_id} executed, result: {res:?}, elapsed: {:?}", + elapsed + ); + { + let mut state = state_ref.write().await; + state.set_next_sleep(elapsed, res.is_ok()); + } + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RecordingRuleTask { + flow_id: FlowId, + query: String, + /// in millisecond + expire_after: Option, + sink_table_name: [String; 3], + state: Arc>, +} + +impl RecordingRuleTask { + pub fn new( + flow_id: FlowId, + query: &str, + expire_after: Option, + sink_table_name: [String; 3], + query_ctx: QueryContextRef, + shutdown_rx: oneshot::Receiver<()>, + ) -> Self { + Self { + flow_id, + query: query.to_string(), + expire_after, + sink_table_name, + state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))), + } + } +} +impl RecordingRuleTask { + /// This should be called in a new tokio task + pub async fn start_executing( + &self, + engine: QueryEngineRef, + frontend_client: Arc, + ) -> Result<(), Error> { + // only first query don't need upper bound + let mut is_first = true; + + loop { + // FIXME(discord9): test if need upper bound also works + let new_query = self + .gen_query_with_time_window(engine.clone(), false) + .await?; + + let insert_into = format!( + "INSERT INTO {}.{}.{} {}", + self.sink_table_name[0], + self.sink_table_name[1], + self.sink_table_name[2], + new_query + ); + + if is_first { + is_first = false; + } + + let instant = Instant::now(); + let flow_id = self.flow_id; + info!("Executing flow {flow_id} with query {insert_into}"); + + let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME + .with_label_values(&[flow_id.to_string().as_str()]) + .start_timer(); + + let res = frontend_client.sql(&insert_into).await; + drop(timer); + + let elapsed = instant.elapsed(); + info!( + "Flow {flow_id} executed, result: {res:?}, elapsed: {:?}", + elapsed + ); + { + let mut state = self.state.write().await; + state.set_next_sleep(elapsed, res.is_ok()); + } + let sleep_until = { + let mut state = self.state.write().await; + match state.shutdown_rx.try_recv() { + Ok(()) => break Ok(()), + Err(TryRecvError::Closed) => { + warn!("Unexpected shutdown flow {flow_id}, shutdown anyway"); + break Ok(()); + } + Err(TryRecvError::Empty) => (), + } + state.get_next_start_query_time(None) + }; + tokio::time::sleep_until(sleep_until).await; + } + } + + async fn gen_query_with_time_window( + &self, + engine: QueryEngineRef, + need_upper_bound: bool, + ) -> Result { + let query_ctx = self.state.read().await.query_ctx.clone(); + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + let low_bound = self + .expire_after + .map(|e| since_the_epoch.as_millis() - e as u128); + + let Some(low_bound) = low_bound else { + return Ok(self.query.clone()); + }; + + let low_bound = Timestamp::new_millisecond(low_bound as i64); + + let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?; + + let (col_name, lower, upper) = + find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone()) + .await?; + + let new_sql = { + let to_df_literal = |value| -> Result<_, Error> { + let value = Value::from(value); + let value = value + .try_to_scalar_value(&value.data_type()) + .with_context(|_| DatatypesSnafu { + extra: format!("Failed to convert to scalar value: {}", value), + })?; + Ok(value) + }; + let lower = lower.map(to_df_literal).transpose()?; + let upper = upper.map(to_df_literal).transpose()?.and_then(|u| { + if need_upper_bound { + Some(u) + } else { + None + } + }); + let expr = { + use datafusion_expr::{col, lit}; + match (lower, upper) { + (Some(l), Some(u)) => col(&col_name) + .gt_eq(lit(l)) + .and(col(&col_name).lt_eq(lit(u))), + (Some(l), None) => col(&col_name).gt_eq(lit(l)), + (None, Some(u)) => col(&col_name).lt(lit(u)), + // no time window, direct return + (None, None) => return Ok(self.query.clone()), + } + }; + + let mut add_filter = AddFilterRewriter::new(expr); + // make a not optimized plan for clearer unparse + let plan = + sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, false).await?; + let plan = plan + .clone() + .rewrite(&mut add_filter) + .with_context(|_| DatafusionSnafu { + context: format!("Failed to rewrite plan {plan:?}"), + })? + .data; + df_plan_to_sql(&plan)? + }; + + Ok(new_sql) + } +} + +#[derive(Debug)] +pub struct RecordingRuleState { + query_ctx: QueryContextRef, + /// last query complete time + last_update_time: Instant, + /// last time query duration + last_query_duration: Duration, + exec_state: ExecState, + shutdown_rx: oneshot::Receiver<()>, +} + +impl RecordingRuleState { + pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { + Self { + query_ctx, + last_update_time: Instant::now(), + last_query_duration: Duration::from_secs(0), + exec_state: ExecState::Idle, + shutdown_rx, + } + } + + /// called after last query is done + /// `is_succ` indicate whether the last query is successful + pub fn set_next_sleep(&mut self, elapsed: Duration, _is_succ: bool) { + self.exec_state = ExecState::Idle; + self.last_query_duration = elapsed; + self.last_update_time = Instant::now(); + } + + /// wait for at least `last_query_duration`, at most `max_timeout` to start next query + pub fn get_next_start_query_time(&self, max_timeout: Option) -> Instant { + let next_duration = max_timeout + .unwrap_or(self.last_query_duration) + .min(self.last_query_duration); + let next_duration = next_duration.max(MIN_REFRESH_DURATION); + + self.last_update_time + next_duration + } +} + +#[derive(Debug, Clone)] +enum ExecState { + Idle, + Executing, +} diff --git a/src/flow/src/recording_rules/frontend_client.rs b/src/flow/src/recording_rules/frontend_client.rs new file mode 100644 index 0000000000..3af09d515a --- /dev/null +++ b/src/flow/src/recording_rules/frontend_client.rs @@ -0,0 +1,173 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user + +use std::collections::BTreeMap; +use std::sync::Arc; + +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::BoxedError; +use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; +use common_meta::peer::Peer; +use common_meta::rpc::store::RangeRequest; +use common_query::Output; +use meta_client::client::MetaClient; +use snafu::{OptionExt, ResultExt}; +use tokio::sync::Mutex; + +use crate::error::{ExternalSnafu, UnexpectedSnafu}; +use crate::Error; + +/// A simple frontend client able to execute sql using grpc protocol +#[derive(Debug)] +pub enum FrontendClient { + Distributed { + meta_client: Arc, + /// list of frontend node and connection to it + database_clients: Mutex, + }, + Standalone { + /// for the sake of simplicity still use grpc even in standalone mode + /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn + /// TODO(discord9): not use grpc under standalone mode + database_client: Database, + }, +} + +#[derive(Debug, Default)] +pub struct RoundRobinClients { + clients: BTreeMap, + next: usize, +} + +impl RoundRobinClients { + fn get_next_client(&mut self) -> Option { + if self.clients.is_empty() { + return None; + } + let idx = self.next % self.clients.len(); + self.next = (self.next + 1) % self.clients.len(); + Some(self.clients.iter().nth(idx).unwrap().1 .1.clone()) + } +} + +impl FrontendClient { + pub fn from_meta_client(meta_client: Arc) -> Self { + Self::Distributed { + meta_client, + database_clients: Default::default(), + } + } + + pub fn from_static_grpc_addr(addr: String) -> Self { + let client = Client::with_urls(vec![addr]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + Self::Standalone { + database_client: database, + } + } +} + +impl FrontendClient { + async fn update_frontend_addr(&self) -> Result<(), Error> { + // TODO(discord9): better error handling + let Self::Distributed { + meta_client, + database_clients, + } = self + else { + return Ok(()); + }; + let cluster_client = meta_client + .cluster_client() + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let cluster_id = meta_client.id().0; + let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend); + let req = RangeRequest::new().with_prefix(prefix); + let res = cluster_client + .range(req) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let mut clients = database_clients.lock().await; + for kv in res.kvs { + let key = NodeInfoKey::try_from(kv.key) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let val = NodeInfo::try_from(kv.value) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + if key.role != Role::Frontend { + return UnexpectedSnafu { + reason: format!( + "Unexpected role(should be frontend) in key: {:?}, val: {:?}", + key, val + ), + } + .fail(); + } + if let Some((peer, database)) = clients.clients.get_mut(&key.node_id) { + if val.peer != *peer { + // update only if not the same + *peer = val.peer.clone(); + let client = Client::with_urls(vec![peer.addr.clone()]); + *database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + } + } else { + let peer = val.peer; + let client = Client::with_urls(vec![peer.addr.clone()]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + clients + .clients + .insert(key.node_id, (peer.clone(), database.clone())); + } + } + drop(clients); + + Ok(()) + } + + /// Get a database client, and possibly update it before returning. + async fn get_database_client(&self) -> Result { + match self { + Self::Standalone { database_client } => Ok(database_client.clone()), + Self::Distributed { + meta_client: _, + database_clients, + } => { + self.update_frontend_addr().await?; + database_clients + .lock() + .await + .get_next_client() + .context(UnexpectedSnafu { + reason: "Can't get any database client", + }) + } + } + } + + pub async fn sql(&self, sql: &str) -> Result { + let db = self.get_database_client().await?; + db.sql(sql) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } +} diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 742359818a..8810689855 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -57,6 +57,7 @@ use crate::error::{ }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; +use crate::recording_rules::{FrontendClient, RecordingRuleEngine}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; use crate::{Error, FlowWorkerManager, FlownodeOptions}; @@ -271,6 +272,8 @@ pub struct FlownodeBuilder { heartbeat_task: Option, /// receive a oneshot sender to send state size report state_report_handler: Option, + /// Client to send sql to frontend + frontend_client: Arc, } impl FlownodeBuilder { @@ -281,6 +284,7 @@ impl FlownodeBuilder { table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, flow_metadata_manager: FlowMetadataManagerRef, + frontend_client: Arc, ) -> Self { Self { opts, @@ -290,6 +294,7 @@ impl FlownodeBuilder { flow_metadata_manager, heartbeat_task: None, state_report_handler: None, + frontend_client, } } @@ -447,7 +452,10 @@ impl FlownodeBuilder { let node_id = self.opts.node_id.map(|id| id as u32); - let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta); + let rule_engine = + RecordingRuleEngine::new(self.frontend_client.clone(), query_engine.clone()); + + let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine); for worker_id in 0..num_workers { let (tx, rx) = oneshot::channel(); diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs index 3cc42efda5..3c4a614951 100644 --- a/src/flow/src/test_utils.rs +++ b/src/flow/src/test_utils.rs @@ -86,7 +86,8 @@ pub fn create_test_query_engine() -> Arc { let schema = vec![ datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false), - datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false), + datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), ]; let mut columns = vec![]; let numbers = (1..=10).collect_vec(); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index ee7aebba52..b21a21c104 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -112,6 +112,7 @@ impl MetaClientBuilder { .enable_store() .enable_heartbeat() .enable_procedure() + .enable_access_cluster_info() } pub fn enable_heartbeat(self) -> Self { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 2bd4304510..078cbe9c9f 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -68,6 +68,7 @@ pub struct Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, + #[allow(unused)] table_flownode_set_cache: TableFlownodeSetCacheRef, } @@ -335,9 +336,11 @@ impl Inserter { let InstantAndNormalInsertRequests { normal_requests, - instant_requests, + instant_requests: _, } = requests; + // TODO(discord9): mirror some + /* // Mirror requests for source table to flownode asynchronously let flow_mirror_task = FlowMirrorTask::new( &self.table_flownode_set_cache, @@ -347,7 +350,7 @@ impl Inserter { .chain(instant_requests.requests.iter()), ) .await?; - flow_mirror_task.detach(self.node_manager.clone())?; + flow_mirror_task.detach(self.node_manager.clone())?;*/ // Write requests to datanode and wait for response let write_tasks = self @@ -817,12 +820,14 @@ struct CreateAlterTableResult { table_infos: HashMap>, } +#[allow(unused)] struct FlowMirrorTask { requests: HashMap, num_rows: usize, } impl FlowMirrorTask { + #[allow(unused)] async fn new( cache: &TableFlownodeSetCacheRef, requests: impl Iterator, @@ -896,6 +901,7 @@ impl FlowMirrorTask { }) } + #[allow(unused)] fn detach(self, node_manager: NodeManagerRef) -> Result<()> { crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64); for (peer, inserts) in self.requests { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index f2d6c3df23..1fbdf8d7ab 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -40,7 +40,7 @@ use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::datanode::DatanodeBuilder; -use flow::FlownodeBuilder; +use flow::{FlownodeBuilder, FrontendClient}; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; @@ -164,12 +164,15 @@ impl GreptimeDbStandaloneBuilder { Some(procedure_manager.clone()), ); + let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone(); + let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr); let flow_builder = FlownodeBuilder::new( Default::default(), plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), flow_metadata_manager.clone(), + Arc::new(frontend_client), ); let flownode = Arc::new(flow_builder.build().await.unwrap());