From 2e101bad53264737a867de6775b3106551e0ddd7 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 5 Dec 2023 10:25:28 +0800 Subject: [PATCH] feat: patch to resend insert to flow worker --- src/frontend/src/instance.rs | 2 +- src/frontend/src/instance/grpc.rs | 26 +++++++++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 992d428197..aa3ab9c883 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -356,7 +356,7 @@ impl Instance { inserter.clone(), )); - let addr = std::env::var("FLOW_ADDR").unwrap_or("[::1]:14514".to_string()); + let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string()); let conn = tonic::transport::Endpoint::new(addr) .unwrap() .connect_lazy(); diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 6f29427cbf..49c3e4c6e0 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -50,6 +50,7 @@ impl GrpcQueryHandler for Instance { .as_ref() .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request)) .context(PermissionSnafu)?; + /* // copy row inserts to flow worker if let Request::RowInserts(_) = &request { let full_req = GreptimeRequest { @@ -69,8 +70,8 @@ impl GrpcQueryHandler for Instance { info!("flow proxy is not initialized"); } }; - let output = Output::AffectedRows(0); - /* + let output = Output::AffectedRows(0);*/ + let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?, @@ -156,7 +157,7 @@ impl GrpcQueryHandler for Instance { } }; - let output = interceptor.post_execute(output, ctx)?;*/ + let output = interceptor.post_execute(output, ctx)?; Ok(output) } } @@ -210,10 +211,29 @@ impl Instance { requests: RowInsertRequests, ctx: QueryContextRef, ) -> Result { + let full_req = GreptimeRequest { + header: None, + request: Some(Request::RowInserts(requests.clone())), + }; + + if let Some(flow_proxy) = &self.flow { + flow_proxy + .flow_client + .lock() + .await + .handle(full_req) + .await + .unwrap(); + } else { + info!("flow proxy is not initialized"); + } + Ok(Output::AffectedRows(0)) + /* self.inserter .handle_row_inserts(requests, ctx, self.statement_executor.as_ref()) .await .context(TableOperationSnafu) + */ } pub async fn handle_deletes(