feat: resend req to flow

This commit is contained in:
Discord9
2023-11-30 14:08:58 +08:00
parent 0badb3715e
commit 1a1c093ad7
3 changed files with 36 additions and 1 deletions

View File

@@ -44,6 +44,7 @@ pub struct FrontendOptions {
pub logging: LoggingOptions,
pub datanode: DatanodeOptions,
pub user_provider: Option<String>,
pub flow_grpc_addr: Option<String>,
}
impl Default for FrontendOptions {
@@ -64,6 +65,7 @@ impl Default for FrontendOptions {
logging: LoggingOptions::default(),
datanode: DatanodeOptions::default(),
user_provider: None,
flow_grpc_addr: None,
}
}
}

View File

@@ -24,6 +24,7 @@ pub mod standalone;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
@@ -82,6 +83,8 @@ use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use self::region_query::FrontendRegionQueryHandler;
use self::standalone::StandaloneTableMetadataCreator;
@@ -117,6 +120,12 @@ pub trait FrontendInstance:
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
pub type StatementExecutorRef = Arc<StatementExecutor>;
/// Send certain query to flow worker through grpc
pub struct FlowProxy {
flow_client: Mutex<GreptimeDatabaseClient<Channel>>,
}
pub type FlowProxyRef = Arc<FlowProxy>;
#[derive(Clone)]
pub struct Instance {
catalog_manager: CatalogManagerRef,
@@ -128,6 +137,7 @@ pub struct Instance {
heartbeat_task: Option<HeartbeatTask>,
inserter: InserterRef,
deleter: DeleterRef,
flow: Option<FlowProxyRef>,
}
impl Instance {
@@ -219,6 +229,7 @@ impl Instance {
heartbeat_task,
inserter,
deleter,
flow: None,
})
}
@@ -345,6 +356,11 @@ impl Instance {
inserter.clone(),
));
let addr = std::env::var("FLOW_ADDR").unwrap_or("[::1]:14514".to_string());
let conn = tonic::transport::Endpoint::new(addr)
.unwrap()
.connect_lazy();
let client = GreptimeDatabaseClient::new(conn);
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
@@ -355,6 +371,12 @@ impl Instance {
heartbeat_task: None,
inserter,
deleter,
flow: Some(
FlowProxy {
flow_client: Mutex::new(client),
}
.into(),
),
})
}

View File

@@ -15,7 +15,7 @@
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests, GreptimeRequest};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
@@ -47,6 +47,17 @@ impl GrpcQueryHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;
// copy row inserts to flow worker
if let Request::RowInserts(_) = &request {
let full_req = GreptimeRequest{
header: None,
request: Some(request.clone())
};
if let Some(flow_proxy) = &self.flow{
flow_proxy.flow_client.lock().await.handle(full_req).await.unwrap();
}
};
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,