feat: impl Flownode for FlowNodeManager

This commit is contained in:
discord9
2024-05-07 14:44:48 +08:00
parent e7801abd0c
commit d88cff6f51
2 changed files with 116 additions and 1 deletions

View File

@@ -52,6 +52,7 @@ use crate::transform::sql_to_flow_plan;
pub(crate) mod error;
mod parse_expr;
mod server;
mod standalone;
#[cfg(test)]
mod tests;
mod util;
@@ -67,7 +68,7 @@ pub type FlowId = u64;
pub type TableName = Vec<String>;
/// This function will create a new thread for flow worker and return a handle to the flow node manager
pub fn start_flow_node_and_one_worker(
pub fn start_flow_node_with_one_worker(
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,

View File

@@ -0,0 +1,114 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! impl `FlowNode` trait for FlowNodeManager so standalone can easily call them
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_meta::error::{Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use itertools::Itertools;
use snafu::ResultExt;
use crate::adapter::FlowNodeManager;
use crate::repr::{self, DiffRow};
fn to_meta_err(err: impl ToString) -> common_meta::error::Error {
UnexpectedSnafu {
err_msg: err.to_string(),
}
.build()
}
#[async_trait::async_trait]
impl Flownode for FlowNodeManager {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_when,
comment,
sql,
flow_options,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_id = self
.table_info_source
.get_table_id_from_proto_name(&sink_table_name)
.await
.map_err(to_meta_err)?;
let ret = self
.create_flow(
task_id.id as u64,
sink_table_id,
&source_table_ids,
create_if_not_exists,
Some(expire_when),
Some(comment),
sql,
flow_options,
)
.await
.map_err(to_meta_err)?;
Ok(FlowResponse {
affected_tasks: ret
.map(|id| greptime_proto::v1::flow::TaskId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err)?;
Ok(Default::default())
}
None => UnexpectedSnafu {
err_msg: "Missing request body",
}
.fail(),
_ => UnexpectedSnafu {
err_msg: "Invalid request body.",
}
.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
for write_request in request.requests {
let region_id = write_request.region_id;
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
// TODO(discord9): reconsider time assignment mechanism
let now = self.tick_manager.tick();
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(repr::Row::from)
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows)
.await
.map_err(to_meta_err)?;
}
// since `run_available` doesn't blocking, we can just trigger a run here
self.run_available().await;
// write back should be config to be timed in somewhere else like one attempt per second
Ok(Default::default())
}
}