diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index d43e3816fb..9d65e2284c 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -44,6 +44,7 @@ pub struct FrontendOptions { pub logging: LoggingOptions, pub datanode: DatanodeOptions, pub user_provider: Option, + pub flow_grpc_addr: Option, } impl Default for FrontendOptions { @@ -64,6 +65,7 @@ impl Default for FrontendOptions { logging: LoggingOptions::default(), datanode: DatanodeOptions::default(), user_provider: None, + flow_grpc_addr: None, } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2ba0daa0a2..992d428197 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -24,6 +24,7 @@ pub mod standalone; use std::collections::HashMap; use std::sync::Arc; +use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::meta::Role; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -82,6 +83,8 @@ use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; +use tokio::sync::Mutex; +use tonic::transport::Channel; use self::region_query::FrontendRegionQueryHandler; use self::standalone::StandaloneTableMetadataCreator; @@ -117,6 +120,12 @@ pub trait FrontendInstance: pub type FrontendInstanceRef = Arc; pub type StatementExecutorRef = Arc; +/// Send certain query to flow worker through grpc +pub struct FlowProxy { + flow_client: Mutex>, +} +pub type FlowProxyRef = Arc; + #[derive(Clone)] pub struct Instance { catalog_manager: CatalogManagerRef, @@ -128,6 +137,7 @@ pub struct Instance { heartbeat_task: Option, inserter: InserterRef, deleter: DeleterRef, + flow: Option, } impl Instance { @@ -219,6 +229,7 @@ impl Instance { heartbeat_task, inserter, deleter, + flow: None, }) } @@ -345,6 +356,11 @@ impl Instance { inserter.clone(), )); + let addr = std::env::var("FLOW_ADDR").unwrap_or("[::1]:14514".to_string()); + let conn = tonic::transport::Endpoint::new(addr) + .unwrap() + .connect_lazy(); + let client = GreptimeDatabaseClient::new(conn); Ok(Instance { catalog_manager: catalog_manager.clone(), script_executor, @@ -355,6 +371,12 @@ impl Instance { heartbeat_task: None, inserter, deleter, + flow: Some( + FlowProxy { + flow_client: Mutex::new(client), + } + .into(), + ), }) } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 74947581dc..72ee702ccc 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -15,7 +15,7 @@ use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; -use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests}; +use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests, GreptimeRequest}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_meta::table_name::TableName; @@ -47,6 +47,17 @@ impl GrpcQueryHandler for Instance { .as_ref() .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request)) .context(PermissionSnafu)?; + // copy row inserts to flow worker + if let Request::RowInserts(_) = &request { + let full_req = GreptimeRequest{ + header: None, + request: Some(request.clone()) + }; + + if let Some(flow_proxy) = &self.flow{ + flow_proxy.flow_client.lock().await.handle(full_req).await.unwrap(); + } + }; let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,