diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index b535542a3d..5ed3dec499 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -52,6 +52,7 @@ use crate::transform::sql_to_flow_plan; pub(crate) mod error; mod parse_expr; mod server; +mod standalone; #[cfg(test)] mod tests; mod util; @@ -67,7 +68,7 @@ pub type FlowId = u64; pub type TableName = Vec; /// This function will create a new thread for flow worker and return a handle to the flow node manager -pub fn start_flow_node_and_one_worker( +pub fn start_flow_node_with_one_worker( frontend_invoker: Box, query_engine: Arc, table_meta: TableMetadataManagerRef, diff --git a/src/flow/src/adapter/standalone.rs b/src/flow/src/adapter/standalone.rs new file mode 100644 index 0000000000..174116908b --- /dev/null +++ b/src/flow/src/adapter/standalone.rs @@ -0,0 +1,114 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! impl `FlowNode` trait for FlowNodeManager so standalone can easily call them + +use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; +use api::v1::region::InsertRequests; +use common_meta::error::{Result, UnexpectedSnafu}; +use common_meta::node_manager::Flownode; +use itertools::Itertools; +use snafu::ResultExt; + +use crate::adapter::FlowNodeManager; +use crate::repr::{self, DiffRow}; + +fn to_meta_err(err: impl ToString) -> common_meta::error::Error { + UnexpectedSnafu { + err_msg: err.to_string(), + } + .build() +} + +#[async_trait::async_trait] +impl Flownode for FlowNodeManager { + async fn handle(&self, request: FlowRequest) -> Result { + match request.body { + Some(flow_request::Body::Create(CreateRequest { + flow_id: Some(task_id), + source_table_ids, + sink_table_name: Some(sink_table_name), + create_if_not_exists, + expire_when, + comment, + sql, + flow_options, + })) => { + let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); + let sink_table_id = self + .table_info_source + .get_table_id_from_proto_name(&sink_table_name) + .await + .map_err(to_meta_err)?; + let ret = self + .create_flow( + task_id.id as u64, + sink_table_id, + &source_table_ids, + create_if_not_exists, + Some(expire_when), + Some(comment), + sql, + flow_options, + ) + .await + .map_err(to_meta_err)?; + Ok(FlowResponse { + affected_tasks: ret + .map(|id| greptime_proto::v1::flow::TaskId { id: id as u32 }) + .into_iter() + .collect_vec(), + ..Default::default() + }) + } + Some(flow_request::Body::Drop(DropRequest { + flow_id: Some(flow_id), + })) => { + self.remove_flow(flow_id.id as u64) + .await + .map_err(to_meta_err)?; + Ok(Default::default()) + } + None => UnexpectedSnafu { + err_msg: "Missing request body", + } + .fail(), + _ => UnexpectedSnafu { + err_msg: "Invalid request body.", + } + .fail(), + } + } + + async fn handle_inserts(&self, request: InsertRequests) -> Result { + for write_request in request.requests { + let region_id = write_request.region_id; + let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]); + // TODO(discord9): reconsider time assignment mechanism + let now = self.tick_manager.tick(); + let rows: Vec = rows_proto + .into_iter() + .map(repr::Row::from) + .map(|r| (r, now, 1)) + .collect_vec(); + self.handle_write_request(region_id.into(), rows) + .await + .map_err(to_meta_err)?; + } + // since `run_available` doesn't blocking, we can just trigger a run here + self.run_available().await; + // write back should be config to be timed in somewhere else like one attempt per second + Ok(Default::default()) + } +}