diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 20191080d7..bfa4922020 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -638,6 +638,7 @@ impl FlownodeManager { comment: Option, sql: String, flow_options: HashMap, + query_ctx: Option, ) -> Result, Error> { if create_if_not_exist { // check if the task already exists @@ -662,7 +663,7 @@ impl FlownodeManager { node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); // TODO(discord9): pass the actual `QueryContext` in here - node_ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public"))); + node_ctx.query_context = query_ctx.map(Arc::new); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?; info!("Flow Plan is {:?}", flow_plan); diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index e27942c970..d8e2714a70 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -18,6 +18,7 @@ use std::net::SocketAddr; use std::sync::Arc; use api::v1::flow::{CreateRequest, DropRequest}; +use common_meta::node_manager::Flownode; use common_telemetry::tracing::info; use futures::FutureExt; use greptime_proto::v1::flow::{ @@ -34,7 +35,6 @@ use tonic::{Request, Response, Status}; use crate::adapter::{FlownodeManager, FlownodeManagerRef}; use crate::repr::{self, DiffRow}; - pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; #[derive(Clone)] @@ -48,82 +48,41 @@ impl flow_server::Flow for FlowService { &self, request: Request, ) -> Result, Status> { - match request.into_inner().body { - Some(flow_request::Body::Create(CreateRequest { - flow_id: Some(task_id), - source_table_ids, - sink_table_name: Some(sink_table_name), - create_if_not_exists, - expire_when, - comment, - sql, - flow_options, - })) => { - let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); - let sink_table_name = [ - sink_table_name.catalog_name, - sink_table_name.schema_name, - sink_table_name.table_name, - ]; - let ret = self - .manager - .create_flow( - task_id.id as u64, - sink_table_name, - &source_table_ids, - create_if_not_exists, - Some(expire_when), - Some(comment), - sql, - flow_options, - ) - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; - Ok(Response::new(FlowResponse { - affected_tasks: ret - .map(|id| greptime_proto::v1::flow::TaskId { id: id as u32 }) - .into_iter() - .collect_vec(), - ..Default::default() - })) - } - Some(flow_request::Body::Drop(DropRequest { - flow_id: Some(flow_id), - })) => { - self.manager - .remove_flow(flow_id.id as u64) - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; - Ok(Response::new(Default::default())) - } - None => Err(Status::invalid_argument("Missing request body.")), - _ => Err(Status::invalid_argument("Invalid request body.")), - } + let request = request.into_inner(); + self.manager + .handle(request) + .await + .map(Response::new) + .map_err(|e| { + let msg = format!("failed to handle request: {:?}", e); + Status::internal(msg) + }) } async fn handle_mirror_request( &self, request: Request, ) -> Result, Status> { - for write_request in request.into_inner().requests { - let region_id = write_request.region_id; - let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]); - // TODO(discord9): reconsider time assignment mechanism - let now = self.manager.tick_manager.tick(); - let rows: Vec = rows_proto + let request = request.into_inner(); + // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define + let request = api::v1::region::InsertRequests { + requests: request + .requests .into_iter() - .map(repr::Row::from) - .map(|r| (r, now, 1)) - .collect_vec(); - self.manager - .handle_write_request(region_id.into(), rows) - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; - } - // since `run_available` doesn't blocking, we can just trigger a run here - self.manager.run_available().await; - // write back should be config to be timed in somewhere else like one attempt per second - Ok(Response::new(Default::default())) + .map(|insert| api::v1::region::InsertRequest { + region_id: insert.region_id, + rows: insert.rows, + }) + .collect_vec(), + }; + self.manager + .handle_inserts(request) + .await + .map(Response::new) + .map_err(|e| { + let msg = format!("failed to handle request: {:?}", e); + Status::internal(msg) + }) } } diff --git a/src/flow/src/adapter/standalone.rs b/src/flow/src/adapter/standalone.rs index cc4340507a..773d3d0bc8 100644 --- a/src/flow/src/adapter/standalone.rs +++ b/src/flow/src/adapter/standalone.rs @@ -34,6 +34,10 @@ fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error { #[async_trait::async_trait] impl Flownode for FlownodeManager { async fn handle(&self, request: FlowRequest) -> Result { + let query_ctx = request + .header + .and_then(|h| h.query_context) + .map(|ctx| ctx.into()); match request.body { Some(flow_request::Body::Create(CreateRequest { flow_id: Some(task_id), @@ -61,6 +65,7 @@ impl Flownode for FlownodeManager { Some(comment), sql, flow_options, + query_ctx, ) .await .map_err(to_meta_err)?;