From 032df4c5330a2b94515d38764c56c7da4fe4ee77 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 15 Apr 2025 15:03:12 +0800 Subject: [PATCH] feat(flow): dual engine (#5881) * feat: partial use batch mode(WIP) * feat: add flow engine trait * refactor: more trait method * dual engine * feat: dual engine * refactor: flow map cache * chore: per review * chore: per review --- Cargo.toml | 5 + src/cmd/src/flownode.rs | 6 +- src/cmd/src/standalone.rs | 8 +- src/common/meta/src/ddl/create_flow.rs | 2 +- src/flow/src/adapter.rs | 53 +-- src/flow/src/adapter/flownode_impl.rs | 437 +++++++++++++++++++++---- src/flow/src/batching_mode.rs | 4 +- src/flow/src/batching_mode/engine.rs | 48 ++- src/flow/src/batching_mode/state.rs | 3 +- src/flow/src/batching_mode/task.rs | 4 +- src/flow/src/engine.rs | 57 ++++ src/flow/src/error.rs | 2 +- src/flow/src/lib.rs | 5 +- src/flow/src/server.rs | 10 +- src/meta-client/src/client.rs | 1 + tests-integration/src/standalone.rs | 5 +- 16 files changed, 534 insertions(+), 116 deletions(-) create mode 100644 src/flow/src/engine.rs diff --git a/Cargo.toml b/Cargo.toml index 38b749e7b0..f3bd54a661 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -307,3 +307,8 @@ strip = true [profile.dev.package.tests-fuzz] debug = false strip = true + +[profile.dev] +opt-level = 1 +[profile.dev.package."*"] +opt-level = 3 diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index fc23d37c23..a7b530e558 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -32,7 +32,9 @@ use common_meta::key::TableMetadataManager; use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; -use flow::{FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendInvoker}; +use flow::{ + FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker, +}; use meta_client::{MetaClientOptions, MetaClientType}; use snafu::{ensure, OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; @@ -313,12 +315,14 @@ impl StartCommand { ); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone())); + let frontend_client = FrontendClient::from_meta_client(meta_client.clone()); let flownode_builder = FlownodeBuilder::new( opts.clone(), Plugins::new(), table_metadata_manager, catalog_manager.clone(), flow_metadata_manager, + Arc::new(frontend_client), ) .with_heartbeat_task(heartbeat_task); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 4504927cc8..3177a2446f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -57,7 +57,7 @@ use datanode::region_server::RegionServer; use file_engine::config::EngineConfig as FileEngineConfig; use flow::{ FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions, - FrontendInvoker, + FrontendClient, FrontendInvoker, }; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::instance::builder::FrontendBuilder; @@ -523,12 +523,18 @@ impl StartCommand { flow: opts.flow.clone(), ..Default::default() }; + + // TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without + // actually make a connection + let fe_server_addr = fe_opts.grpc.bind_addr.clone(); + let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr); let flow_builder = FlownodeBuilder::new( flownode_options, plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), flow_metadata_manager.clone(), + Arc::new(frontend_client), ); let flownode = flow_builder .build() diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 4e7d661c1d..8b1c0354d4 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -324,7 +324,7 @@ pub enum CreateFlowState { } /// The type of flow. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum FlowType { /// The flow is a batching task. Batching, diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 8fd62ee2a0..516254ae55 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -16,7 +16,7 @@ //! and communicating with other parts of the database #![warn(unused_imports)] -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -56,6 +56,7 @@ use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, U use crate::expr::Batch; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; +use crate::{CreateFlowArgs, FlowId, TableName}; mod flownode_impl; mod parse_expr; @@ -78,11 +79,6 @@ pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at"; -// TODO(discord9): refactor common types for flow to a separate module -/// FlowId is a unique identifier for a flow task -pub type FlowId = u64; -pub type TableName = [String; 3]; - /// Flow config that exists both in standalone&distributed mode #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] @@ -731,25 +727,10 @@ impl FlowWorkerManager { } } -/// The arguments to create a flow in [`FlowWorkerManager`]. -#[derive(Debug, Clone)] -pub struct CreateFlowArgs { - pub flow_id: FlowId, - pub sink_table_name: TableName, - pub source_table_ids: Vec, - pub create_if_not_exists: bool, - pub or_replace: bool, - pub expire_after: Option, - pub comment: Option, - pub sql: String, - pub flow_options: HashMap, - pub query_ctx: Option, -} - /// Create&Remove flow impl FlowWorkerManager { /// remove a flow by it's id - pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { if handle.contains_flow(flow_id).await? { handle.remove_flow(flow_id).await?; @@ -766,7 +747,7 @@ impl FlowWorkerManager { /// 1. parse query into typed plan(and optional parse expire_after expr) /// 2. render source/sink with output table id and used input table id #[allow(clippy::too_many_arguments)] - pub async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result, Error> { let CreateFlowArgs { flow_id, sink_table_name, @@ -905,6 +886,32 @@ impl FlowWorkerManager { info!("Successfully create flow with id={}", flow_id); Ok(Some(flow_id)) } + + pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result { + debug!("Starting to flush flow_id={:?}", flow_id); + // lock to make sure writes before flush are written to flow + // and immediately drop to prevent following writes to be blocked + drop(self.flush_lock.write().await); + let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?; + let rows_send = self.run_available(true).await?; + let row = self.send_writeback_requests().await?; + debug!( + "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed", + flow_id, flushed_input_rows, rows_send, row + ); + Ok(row) + } + + pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result { + let mut exist = false; + for handle in self.worker_handles.iter() { + if handle.contains_flow(flow_id).await? { + exist = true; + break; + } + } + Ok(exist) + } } /// FlowTickManager is a manager for flow tick, which trakc flow execution progress diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1daec77fbd..b7d218ef21 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -13,40 +13,228 @@ // limitations under the License. //! impl `FlowNode` trait for FlowNodeManager so standalone can call them -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use api::v1::flow::{ flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow, }; use api::v1::region::InsertRequests; use common_error::ext::BoxedError; -use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; -use common_telemetry::{debug, trace}; +use common_meta::ddl::create_flow::FlowType; +use common_meta::error::{Result as MetaResult, UnexpectedSnafu}; +use common_runtime::JoinHandle; +use common_telemetry::{trace, warn}; use datatypes::value::Value; use itertools::Itertools; use snafu::{IntoError, OptionExt, ResultExt}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, TableId}; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; -use crate::error::{CreateFlowSnafu, InsertIntoFlowSnafu, InternalSnafu}; +use crate::batching_mode::engine::BatchingEngine; +use crate::engine::FlowEngine; +use crate::error::{CreateFlowSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu}; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; +use crate::{Error, FlowId}; -/// return a function to convert `crate::error::Error` to `common_meta::error::Error` -fn to_meta_err( - location: snafu::Location, -) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error { - move |err: crate::error::Error| -> common_meta::error::Error { - common_meta::error::Error::External { - location, - source: BoxedError::new(err), +/// Manage both streaming and batching mode engine +/// +/// including create/drop/flush flow +/// and redirect insert requests to the appropriate engine +pub struct FlowDualEngine { + streaming_engine: Arc, + batching_engine: Arc, + /// helper struct for faster query flow by table id or vice versa + src_table2flow: std::sync::RwLock, +} + +struct SrcTableToFlow { + /// mapping of table ids to flow ids for streaming mode + stream: HashMap>, + /// mapping of table ids to flow ids for batching mode + batch: HashMap>, + /// mapping of flow ids to (flow type, source table ids) + flow_infos: HashMap)>, +} + +impl SrcTableToFlow { + fn in_stream(&self, table_id: TableId) -> bool { + self.stream.contains_key(&table_id) + } + fn in_batch(&self, table_id: TableId) -> bool { + self.batch.contains_key(&table_id) + } + fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec) { + let mapping = match flow_type { + FlowType::Streaming => &mut self.stream, + FlowType::Batching => &mut self.batch, + }; + + for src_table in src_table_ids.clone() { + mapping + .entry(src_table) + .and_modify(|flows| { + flows.insert(flow_id); + }) + .or_insert_with(|| { + let mut set = HashSet::new(); + set.insert(flow_id); + set + }); } + self.flow_infos.insert(flow_id, (flow_type, src_table_ids)); + } + + fn remove_flow(&mut self, flow_id: FlowId) { + let mapping = match self.get_flow_type(flow_id) { + Some(FlowType::Streaming) => &mut self.stream, + Some(FlowType::Batching) => &mut self.batch, + None => return, + }; + if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) { + for src_table in src_table_ids { + if let Some(flows) = mapping.get_mut(&src_table) { + flows.remove(&flow_id); + } + } + } + } + + fn get_flow_type(&self, flow_id: FlowId) -> Option { + self.flow_infos + .get(&flow_id) + .map(|(flow_type, _)| flow_type) + .cloned() + } +} + +impl FlowEngine for FlowDualEngine { + async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + let flow_type = args + .flow_options + .get(FlowType::FLOW_TYPE_KEY) + .map(|s| s.as_str()); + + let flow_type = match flow_type { + Some(FlowType::BATCHING) => FlowType::Batching, + Some(FlowType::STREAMING) => FlowType::Streaming, + None => FlowType::Batching, + Some(flow_type) => { + return InternalSnafu { + reason: format!("Invalid flow type: {}", flow_type), + } + .fail() + } + }; + + let flow_id = args.flow_id; + let src_table_ids = args.source_table_ids.clone(); + + let res = match flow_type { + FlowType::Batching => self.batching_engine.create_flow(args).await, + FlowType::Streaming => self.streaming_engine.create_flow(args).await, + }?; + + self.src_table2flow + .write() + .unwrap() + .add_flow(flow_id, flow_type, src_table_ids); + + Ok(res) + } + + async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); + match flow_type { + Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await, + Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await, + None => FlowNotFoundSnafu { id: flow_id }.fail(), + }?; + // remove mapping + self.src_table2flow.write().unwrap().remove_flow(flow_id); + Ok(()) + } + + async fn flush_flow(&self, flow_id: FlowId) -> Result { + let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); + match flow_type { + Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await, + Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await, + None => FlowNotFoundSnafu { id: flow_id }.fail(), + } + } + + async fn flow_exist(&self, flow_id: FlowId) -> Result { + let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); + // not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine + match flow_type { + Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await, + Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await, + None => Ok(false), + } + } + + async fn handle_flow_inserts( + &self, + request: api::v1::region::InsertRequests, + ) -> Result<(), Error> { + // TODO(discord9): make as little clone as possible + let mut to_stream_engine = Vec::with_capacity(request.requests.len()); + let mut to_batch_engine = request.requests; + + { + let src_table2flow = self.src_table2flow.read().unwrap(); + to_batch_engine.retain(|req| { + let region_id = RegionId::from(req.region_id); + let table_id = region_id.table_id(); + let is_in_stream = src_table2flow.in_stream(table_id); + let is_in_batch = src_table2flow.in_batch(table_id); + if is_in_stream { + to_stream_engine.push(req.clone()); + } + if is_in_batch { + return true; + } + if !is_in_batch && !is_in_stream { + // TODO(discord9): also put to centralized logging for flow once it implemented + warn!("Table {} is not any flow's source table", table_id) + } + false + }); + // drop(src_table2flow); + // can't use drop due to https://github.com/rust-lang/rust/pull/128846 + } + + let streaming_engine = self.streaming_engine.clone(); + let stream_handler: JoinHandle> = + common_runtime::spawn_global(async move { + streaming_engine + .handle_flow_inserts(api::v1::region::InsertRequests { + requests: to_stream_engine, + }) + .await?; + Ok(()) + }); + self.batching_engine + .handle_flow_inserts(api::v1::region::InsertRequests { + requests: to_batch_engine, + }) + .await?; + stream_handler.await.map_err(|e| { + crate::error::UnexpectedSnafu { + reason: format!("JoinError when handle inserts for flow stream engine: {e:?}"), + } + .build() + })??; + + Ok(()) } } #[async_trait::async_trait] -impl common_meta::node_manager::Flownode for FlowWorkerManager { - async fn handle(&self, request: FlowRequest) -> Result { +impl common_meta::node_manager::Flownode for FlowDualEngine { + async fn handle(&self, request: FlowRequest) -> MetaResult { let query_ctx = request .header .and_then(|h| h.query_context) @@ -109,31 +297,10 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { Some(flow_request::Body::Flush(FlushFlow { flow_id: Some(flow_id), })) => { - // TODO(discord9): impl individual flush - debug!("Starting to flush flow_id={:?}", flow_id); - // lock to make sure writes before flush are written to flow - // and immediately drop to prevent following writes to be blocked - drop(self.flush_lock.write().await); - let flushed_input_rows = self - .node_context - .read() - .await - .flush_all_sender() - .await - .map_err(to_meta_err(snafu::location!()))?; - let rows_send = self - .run_available(true) - .await - .map_err(to_meta_err(snafu::location!()))?; let row = self - .send_writeback_requests() + .flush_flow(flow_id.id as u64) .await .map_err(to_meta_err(snafu::location!()))?; - - debug!( - "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed", - flow_id, flushed_input_rows, rows_send, row - ); Ok(FlowResponse { affected_flows: vec![flow_id], affected_rows: row as u64, @@ -151,7 +318,167 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { } } - async fn handle_inserts(&self, request: InsertRequests) -> Result { + async fn handle_inserts(&self, request: InsertRequests) -> MetaResult { + FlowEngine::handle_flow_inserts(self, request) + .await + .map(|_| Default::default()) + .map_err(to_meta_err(snafu::location!())) + } +} + +/// return a function to convert `crate::error::Error` to `common_meta::error::Error` +fn to_meta_err( + location: snafu::Location, +) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error { + move |err: crate::error::Error| -> common_meta::error::Error { + common_meta::error::Error::External { + location, + source: BoxedError::new(err), + } + } +} + +#[async_trait::async_trait] +impl common_meta::node_manager::Flownode for FlowWorkerManager { + async fn handle(&self, request: FlowRequest) -> MetaResult { + let query_ctx = request + .header + .and_then(|h| h.query_context) + .map(|ctx| ctx.into()); + match request.body { + Some(flow_request::Body::Create(CreateRequest { + flow_id: Some(task_id), + source_table_ids, + sink_table_name: Some(sink_table_name), + create_if_not_exists, + expire_after, + comment, + sql, + flow_options, + or_replace, + })) => { + let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); + let sink_table_name = [ + sink_table_name.catalog_name, + sink_table_name.schema_name, + sink_table_name.table_name, + ]; + let expire_after = expire_after.map(|e| e.value); + let args = CreateFlowArgs { + flow_id: task_id.id as u64, + sink_table_name, + source_table_ids, + create_if_not_exists, + or_replace, + expire_after, + comment: Some(comment), + sql: sql.clone(), + flow_options, + query_ctx, + }; + let ret = self + .create_flow(args) + .await + .map_err(BoxedError::new) + .with_context(|_| CreateFlowSnafu { sql: sql.clone() }) + .map_err(to_meta_err(snafu::location!()))?; + METRIC_FLOW_TASK_COUNT.inc(); + Ok(FlowResponse { + affected_flows: ret + .map(|id| greptime_proto::v1::FlowId { id: id as u32 }) + .into_iter() + .collect_vec(), + ..Default::default() + }) + } + Some(flow_request::Body::Drop(DropRequest { + flow_id: Some(flow_id), + })) => { + self.remove_flow(flow_id.id as u64) + .await + .map_err(to_meta_err(snafu::location!()))?; + METRIC_FLOW_TASK_COUNT.dec(); + Ok(Default::default()) + } + Some(flow_request::Body::Flush(FlushFlow { + flow_id: Some(flow_id), + })) => { + let row = self + .flush_flow_inner(flow_id.id as u64) + .await + .map_err(to_meta_err(snafu::location!()))?; + Ok(FlowResponse { + affected_flows: vec![flow_id], + affected_rows: row as u64, + ..Default::default() + }) + } + None => UnexpectedSnafu { + err_msg: "Missing request body", + } + .fail(), + _ => UnexpectedSnafu { + err_msg: "Invalid request body.", + } + .fail(), + } + } + + async fn handle_inserts(&self, request: InsertRequests) -> MetaResult { + self.handle_inserts_inner(request) + .await + .map(|_| Default::default()) + .map_err(to_meta_err(snafu::location!())) + } +} + +impl FlowEngine for FlowWorkerManager { + async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + self.create_flow_inner(args).await + } + + async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + self.remove_flow_inner(flow_id).await + } + + async fn flush_flow(&self, flow_id: FlowId) -> Result { + self.flush_flow_inner(flow_id).await + } + + async fn flow_exist(&self, flow_id: FlowId) -> Result { + self.flow_exist_inner(flow_id).await + } + + async fn handle_flow_inserts( + &self, + request: api::v1::region::InsertRequests, + ) -> Result<(), Error> { + self.handle_inserts_inner(request).await + } +} + +/// Simple helper enum for fetching value from row with default value +#[derive(Debug, Clone)] +enum FetchFromRow { + Idx(usize), + Default(Value), +} + +impl FetchFromRow { + /// Panic if idx is out of bound + fn fetch(&self, row: &repr::Row) -> Value { + match self { + FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(), + FetchFromRow::Default(v) => v.clone(), + } + } +} + +impl FlowWorkerManager { + async fn handle_inserts_inner( + &self, + request: InsertRequests, + ) -> std::result::Result<(), Error> { // using try_read to ensure two things: // 1. flush wouldn't happen until inserts before it is inserted // 2. inserts happening concurrently with flush wouldn't be block by flush @@ -172,11 +499,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { let ctx = self.node_context.read().await; // TODO(discord9): also check schema version so that altered table can be reported - let table_schema = ctx - .table_source - .table_from_id(&table_id) - .await - .map_err(to_meta_err(snafu::location!()))?; + let table_schema = ctx.table_source.table_from_id(&table_id).await?; let default_vals = table_schema .default_values .iter() @@ -210,9 +533,9 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { None => InternalSnafu { reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"), } - .fail().map_err(BoxedError::new).context(ExternalSnafu), + .fail(), }) - .collect::>>()?; + .collect::, _>>()?; let name_to_col = HashMap::<_, _>::from_iter( insert_schema .iter() @@ -229,8 +552,8 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { .copied() .map(FetchFromRow::Idx) .or_else(|| col_default_val.clone().map(FetchFromRow::Default)) - .with_context(|| UnexpectedSnafu { - err_msg: format!( + .with_context(|| crate::error::UnexpectedSnafu { + reason: format!( "Column not found: {}, default_value: {:?}", col_name, col_default_val ), @@ -272,27 +595,9 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { } .into_error(err); common_telemetry::error!(err; "Failed to handle write request"); - let err = to_meta_err(snafu::location!())(err); return Err(err); } } - Ok(Default::default()) - } -} - -/// Simple helper enum for fetching value from row with default value -#[derive(Debug, Clone)] -enum FetchFromRow { - Idx(usize), - Default(Value), -} - -impl FetchFromRow { - /// Panic if idx is out of bound - fn fetch(&self, row: &repr::Row) -> Value { - match self { - FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(), - FetchFromRow::Default(v) => v.clone(), - } + Ok(()) } } diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 138a44b633..152ad5781c 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -16,8 +16,8 @@ use std::time::Duration; -mod engine; -mod frontend_client; +pub(crate) mod engine; +pub(crate) mod frontend_client; mod state; mod task; mod time_window; diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 72ab7042d2..c53107f695 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Batching mode engine + use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use api::v1::flow::FlowResponse; use common_error::ext::BoxedError; use common_meta::ddl::create_flow::FlowType; use common_meta::key::flow::FlowMetadataManagerRef; @@ -30,13 +31,13 @@ use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::{oneshot, RwLock}; -use crate::adapter::{CreateFlowArgs, FlowId, TableName}; use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::task::BatchingTask; use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr}; use crate::batching_mode::utils::sql_to_df_plan; +use crate::engine::FlowEngine; use crate::error::{ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu}; -use crate::Error; +use crate::{CreateFlowArgs, Error, FlowId, TableName}; /// Batching mode Engine, responsible for driving all the batching mode tasks /// @@ -67,10 +68,10 @@ impl BatchingEngine { } } - pub async fn handle_inserts( + pub async fn handle_inserts_inner( &self, request: api::v1::region::InsertRequests, - ) -> Result { + ) -> Result<(), Error> { let table_info_mgr = self.table_meta.table_info_manager(); let mut group_by_table_id: HashMap> = HashMap::new(); @@ -170,7 +171,7 @@ impl BatchingEngine { } drop(tasks); - Ok(Default::default()) + Ok(()) } } @@ -191,7 +192,7 @@ async fn get_table_name( } impl BatchingEngine { - pub async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result, Error> { let CreateFlowArgs { flow_id, sink_table_name, @@ -308,7 +309,7 @@ impl BatchingEngine { Ok(Some(flow_id)) } - pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { if self.tasks.write().await.remove(&flow_id).is_none() { warn!("Flow {flow_id} not found in tasks") } @@ -324,19 +325,42 @@ impl BatchingEngine { Ok(()) } - pub async fn flush_flow(&self, flow_id: FlowId) -> Result<(), Error> { + pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result { let task = self.tasks.read().await.get(&flow_id).cloned(); let task = task.with_context(|| UnexpectedSnafu { reason: format!("Can't found task for flow {flow_id}"), })?; - task.gen_exec_once(&self.query_engine, &self.frontend_client) + let res = task + .gen_exec_once(&self.query_engine, &self.frontend_client) .await?; - Ok(()) + let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; + Ok(affected_rows) } /// Determine if the batching mode flow task exists with given flow id - pub async fn flow_exist(&self, flow_id: FlowId) -> bool { + pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool { self.tasks.read().await.contains_key(&flow_id) } } + +impl FlowEngine for BatchingEngine { + async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + self.create_flow_inner(args).await + } + async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + self.remove_flow_inner(flow_id).await + } + async fn flush_flow(&self, flow_id: FlowId) -> Result { + self.flush_flow_inner(flow_id).await + } + async fn flow_exist(&self, flow_id: FlowId) -> Result { + Ok(self.flow_exist_inner(flow_id).await) + } + async fn handle_flow_inserts( + &self, + request: api::v1::region::InsertRequests, + ) -> Result<(), Error> { + self.handle_inserts_inner(request).await + } +} diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index a406dae798..3a9802713c 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -26,11 +26,10 @@ use snafu::ResultExt; use tokio::sync::oneshot; use tokio::time::Instant; -use crate::adapter::FlowId; use crate::batching_mode::task::BatchingTask; use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu}; -use crate::Error; +use crate::{Error, FlowId}; /// The state of the [`BatchingTask`]. #[derive(Debug)] diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 44312509d4..f4280f54bd 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -43,7 +43,7 @@ use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; use tokio::time::Instant; -use crate::adapter::{FlowId, AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; +use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::state::TaskState; use crate::batching_mode::time_window::TimeWindowExpr; @@ -60,7 +60,7 @@ use crate::error::{ use crate::metrics::{ METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, }; -use crate::Error; +use crate::{Error, FlowId}; /// The task's config, immutable once created #[derive(Clone)] diff --git a/src/flow/src/engine.rs b/src/flow/src/engine.rs new file mode 100644 index 0000000000..33da5252d7 --- /dev/null +++ b/src/flow/src/engine.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Define a trait for flow engine, which is used by both streaming engine and batch engine + +use std::collections::HashMap; + +use session::context::QueryContext; +use table::metadata::TableId; + +use crate::Error; +// TODO(discord9): refactor common types for flow to a separate module +/// FlowId is a unique identifier for a flow task +pub type FlowId = u64; +pub type TableName = [String; 3]; + +/// The arguments to create a flow +#[derive(Debug, Clone)] +pub struct CreateFlowArgs { + pub flow_id: FlowId, + pub sink_table_name: TableName, + pub source_table_ids: Vec, + pub create_if_not_exists: bool, + pub or_replace: bool, + pub expire_after: Option, + pub comment: Option, + pub sql: String, + pub flow_options: HashMap, + pub query_ctx: Option, +} + +pub trait FlowEngine { + /// Create a flow using the provided arguments, return previous flow id if exists and is replaced + async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error>; + /// Remove a flow by its ID + async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error>; + /// Flush the flow, return the number of rows flushed + async fn flush_flow(&self, flow_id: FlowId) -> Result; + /// Check if the flow exists + async fn flow_exist(&self, flow_id: FlowId) -> Result; + /// Handle the insert requests for the flow + async fn handle_flow_inserts( + &self, + request: api::v1::region::InsertRequests, + ) -> Result<(), Error>; +} diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 2488b0a677..1741f8cb1b 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -25,8 +25,8 @@ use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, ResultExt, Snafu}; use tonic::metadata::MetadataMap; -use crate::adapter::FlowId; use crate::expr::EvalError; +use crate::FlowId; /// This error is used to represent all possible errors that can occur in the flow module. #[derive(Snafu)] diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 8ec464730b..5dc3c67491 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -26,9 +26,10 @@ // allow unused for now because it should be use later mod adapter; -mod batching_mode; +pub(crate) mod batching_mode; mod compute; mod df_optimizer; +pub(crate) mod engine; pub mod error; mod expr; pub mod heartbeat; @@ -43,6 +44,8 @@ mod utils; mod test_utils; pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; +pub use batching_mode::frontend_client::FrontendClient; +pub(crate) use engine::{CreateFlowArgs, FlowId, TableName}; pub use error::{Error, Result}; pub use server::{ FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker, diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d0038e6ba1..53712ffb67 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -50,7 +50,7 @@ use tonic::codec::CompressionEncoding; use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; -use crate::adapter::{create_worker, CreateFlowArgs, FlowWorkerManagerRef}; +use crate::adapter::{create_worker, FlowWorkerManagerRef}; use crate::error::{ to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, @@ -59,12 +59,13 @@ use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; -use crate::{Error, FlowWorkerManager, FlownodeOptions}; +use crate::{CreateFlowArgs, Error, FlowWorkerManager, FlownodeOptions, FrontendClient}; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; /// wrapping flow node manager to avoid orphan rule with Arc<...> #[derive(Clone)] pub struct FlowService { + /// TODO(discord9): replace with dual engine pub manager: FlowWorkerManagerRef, } @@ -290,6 +291,7 @@ pub struct FlownodeBuilder { heartbeat_task: Option, /// receive a oneshot sender to send state size report state_report_handler: Option, + frontend_client: Arc, } impl FlownodeBuilder { @@ -300,6 +302,7 @@ impl FlownodeBuilder { table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, flow_metadata_manager: FlowMetadataManagerRef, + frontend_client: Arc, ) -> Self { Self { opts, @@ -309,6 +312,7 @@ impl FlownodeBuilder { flow_metadata_manager, heartbeat_task: None, state_report_handler: None, + frontend_client, } } @@ -432,7 +436,7 @@ impl FlownodeBuilder { ), }; manager - .create_flow(args) + .create_flow_inner(args) .await .map_err(BoxedError::new) .with_context(|_| CreateFlowSnafu { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 3829ae2273..e022717ea1 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -117,6 +117,7 @@ impl MetaClientBuilder { .enable_store() .enable_heartbeat() .enable_procedure() + .enable_access_cluster_info() } pub fn enable_heartbeat(self) -> Self { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 2d6c9bcf97..b85c848c88 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -41,7 +41,7 @@ use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::datanode::DatanodeBuilder; -use flow::FlownodeBuilder; +use flow::{FlownodeBuilder, FrontendClient}; use frontend::frontend::Frontend; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{Instance, StandaloneDatanodeManager}; @@ -174,12 +174,15 @@ impl GreptimeDbStandaloneBuilder { Some(procedure_manager.clone()), ); + let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone(); + let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr); let flow_builder = FlownodeBuilder::new( Default::default(), plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), flow_metadata_manager.clone(), + Arc::new(frontend_client), ); let flownode = Arc::new(flow_builder.build().await.unwrap());