fix: get true QueryContext& dedup code

This commit is contained in:
discord9
2024-05-09 15:45:47 +08:00
parent 76aadb2223
commit 684850f451
3 changed files with 36 additions and 71 deletions

View File

@@ -638,6 +638,7 @@ impl FlownodeManager {
comment: Option<String>,
sql: String,
flow_options: HashMap<String, String>,
query_ctx: Option<QueryContext>,
) -> Result<Option<FlowId>, Error> {
if create_if_not_exist {
// check if the task already exists
@@ -662,7 +663,7 @@ impl FlownodeManager {
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
// TODO(discord9): pass the actual `QueryContext` in here
node_ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public")));
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?;
info!("Flow Plan is {:?}", flow_plan);

View File

@@ -18,6 +18,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::flow::{CreateRequest, DropRequest};
use common_meta::node_manager::Flownode;
use common_telemetry::tracing::info;
use futures::FutureExt;
use greptime_proto::v1::flow::{
@@ -34,7 +35,6 @@ use tonic::{Request, Response, Status};
use crate::adapter::{FlownodeManager, FlownodeManagerRef};
use crate::repr::{self, DiffRow};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
#[derive(Clone)]
@@ -48,82 +48,41 @@ impl flow_server::Flow for FlowService {
&self,
request: Request<FlowRequest>,
) -> Result<Response<FlowResponse>, Status> {
match request.into_inner().body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_when,
comment,
sql,
flow_options,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let ret = self
.manager
.create_flow(
task_id.id as u64,
sink_table_name,
&source_table_ids,
create_if_not_exists,
Some(expire_when),
Some(comment),
sql,
flow_options,
)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok(Response::new(FlowResponse {
affected_tasks: ret
.map(|id| greptime_proto::v1::flow::TaskId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
}))
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.manager
.remove_flow(flow_id.id as u64)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok(Response::new(Default::default()))
}
None => Err(Status::invalid_argument("Missing request body.")),
_ => Err(Status::invalid_argument("Invalid request body.")),
}
let request = request.into_inner();
self.manager
.handle(request)
.await
.map(Response::new)
.map_err(|e| {
let msg = format!("failed to handle request: {:?}", e);
Status::internal(msg)
})
}
async fn handle_mirror_request(
&self,
request: Request<InsertRequests>,
) -> Result<Response<FlowResponse>, Status> {
for write_request in request.into_inner().requests {
let region_id = write_request.region_id;
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
// TODO(discord9): reconsider time assignment mechanism
let now = self.manager.tick_manager.tick();
let rows: Vec<DiffRow> = rows_proto
let request = request.into_inner();
// TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
let request = api::v1::region::InsertRequests {
requests: request
.requests
.into_iter()
.map(repr::Row::from)
.map(|r| (r, now, 1))
.collect_vec();
self.manager
.handle_write_request(region_id.into(), rows)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
}
// since `run_available` doesn't blocking, we can just trigger a run here
self.manager.run_available().await;
// write back should be config to be timed in somewhere else like one attempt per second
Ok(Response::new(Default::default()))
.map(|insert| api::v1::region::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
})
.collect_vec(),
};
self.manager
.handle_inserts(request)
.await
.map(Response::new)
.map_err(|e| {
let msg = format!("failed to handle request: {:?}", e);
Status::internal(msg)
})
}
}

View File

@@ -34,6 +34,10 @@ fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error {
#[async_trait::async_trait]
impl Flownode for FlownodeManager {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
@@ -61,6 +65,7 @@ impl Flownode for FlownodeManager {
Some(comment),
sql,
flow_options,
query_ctx,
)
.await
.map_err(to_meta_err)?;