mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 20:00:36 +00:00
feat: patch to resend insert to flow worker
This commit is contained in:
@@ -356,7 +356,7 @@ impl Instance {
|
||||
inserter.clone(),
|
||||
));
|
||||
|
||||
let addr = std::env::var("FLOW_ADDR").unwrap_or("[::1]:14514".to_string());
|
||||
let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string());
|
||||
let conn = tonic::transport::Endpoint::new(addr)
|
||||
.unwrap()
|
||||
.connect_lazy();
|
||||
|
||||
@@ -50,6 +50,7 @@ impl GrpcQueryHandler for Instance {
|
||||
.as_ref()
|
||||
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
|
||||
.context(PermissionSnafu)?;
|
||||
/*
|
||||
// copy row inserts to flow worker
|
||||
if let Request::RowInserts(_) = &request {
|
||||
let full_req = GreptimeRequest {
|
||||
@@ -69,8 +70,8 @@ impl GrpcQueryHandler for Instance {
|
||||
info!("flow proxy is not initialized");
|
||||
}
|
||||
};
|
||||
let output = Output::AffectedRows(0);
|
||||
/*
|
||||
let output = Output::AffectedRows(0);*/
|
||||
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
|
||||
@@ -156,7 +157,7 @@ impl GrpcQueryHandler for Instance {
|
||||
}
|
||||
};
|
||||
|
||||
let output = interceptor.post_execute(output, ctx)?;*/
|
||||
let output = interceptor.post_execute(output, ctx)?;
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
@@ -210,10 +211,29 @@ impl Instance {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let full_req = GreptimeRequest {
|
||||
header: None,
|
||||
request: Some(Request::RowInserts(requests.clone())),
|
||||
};
|
||||
|
||||
if let Some(flow_proxy) = &self.flow {
|
||||
flow_proxy
|
||||
.flow_client
|
||||
.lock()
|
||||
.await
|
||||
.handle(full_req)
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
info!("flow proxy is not initialized");
|
||||
}
|
||||
Ok(Output::AffectedRows(0))
|
||||
/*
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
*/
|
||||
}
|
||||
|
||||
pub async fn handle_deletes(
|
||||
|
||||
Reference in New Issue
Block a user