diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index aed05f4f41..c27854d22e 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -381,7 +381,7 @@ impl FlownodeManager { pub async fn run_available(&self) { let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { - worker.lock().await.run_available(now); + worker.lock().await.run_available(now).await; } } @@ -513,8 +513,8 @@ impl FlownodeManager { pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { let handle = handle.lock().await; - if handle.contains_flow(flow_id)? { - handle.remove_flow(flow_id)?; + if handle.contains_flow(flow_id).await? { + handle.remove_flow(flow_id).await?; break; } } @@ -541,7 +541,7 @@ impl FlownodeManager { if create_if_not_exist { // check if the task already exists for handle in self.worker_handles.iter() { - if handle.lock().await.contains_flow(flow_id)? { + if handle.lock().await.contains_flow(flow_id).await? { return Ok(None); } } @@ -581,15 +581,17 @@ impl FlownodeManager { .collect::, _>>()?; let handle = &self.worker_handles[0].lock().await; - handle.create_flow( - flow_id, - flow_plan, - sink_id, - sink_sender, - &source_ids, - source_senders, - create_if_not_exist, - )?; + handle + .create_flow( + flow_id, + flow_plan, + sink_id, + sink_sender, + &source_ids, + source_senders, + create_if_not_exist, + ) + .await?; Ok(Some(flow_id)) } @@ -780,8 +782,8 @@ impl FlowNodeContext { } else { let global_id = self.new_global_id(); if let Some(table_id) = table_id { - let schema = srv_map.get_table_schema(&table_id).await.unwrap(); - self.schema.insert(global_id, schema); + let schema = srv_map.get_table_schema(&table_id).await; + let _ = schema.map(|schema| self.schema.insert(global_id, schema)); } self.table_repr.insert(table_name, table_id, global_id); diff --git a/src/flow/src/adapter/standalone.rs b/src/flow/src/adapter/standalone.rs index 3fec54a726..d1d5b8967c 100644 --- a/src/flow/src/adapter/standalone.rs +++ b/src/flow/src/adapter/standalone.rs @@ -16,7 +16,8 @@ use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; -use common_meta::error::{Result, UnexpectedSnafu}; +use common_error::ext::{BoxedError, ErrorExt, StackError}; +use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; use itertools::Itertools; use snafu::ResultExt; @@ -24,11 +25,10 @@ use snafu::ResultExt; use crate::adapter::FlownodeManager; use crate::repr::{self, DiffRow}; -fn to_meta_err(err: impl ToString) -> common_meta::error::Error { - UnexpectedSnafu { - err_msg: err.to_string(), - } - .build() +fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error { + Err::<(), _>(BoxedError::new(err)) + .with_context(|_| ExternalSnafu) + .unwrap_err() } #[async_trait::async_trait] diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index f7fa78624f..8f3b459afa 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -116,7 +116,7 @@ impl WorkerHandle { /// create task, return task id /// #[allow(clippy::too_many_arguments)] - pub fn create_flow( + pub async fn create_flow( &self, task_id: FlowId, plan: TypedPlan, @@ -136,7 +136,7 @@ impl WorkerHandle { create_if_not_exist, }; - let ret = self.itc_client.blocking_lock().call_blocking(req)?; + let ret = self.itc_client.lock().await.call_blocking(req).await?; if let Response::Create { result: task_create_result, } = ret @@ -154,9 +154,9 @@ impl WorkerHandle { } /// remove task, return task id - pub fn remove_flow(&self, task_id: FlowId) -> Result { + pub async fn remove_flow(&self, task_id: FlowId) -> Result { let req = Request::Remove { task_id }; - let ret = self.itc_client.blocking_lock().call_blocking(req)?; + let ret = self.itc_client.lock().await.call_blocking(req).await?; if let Response::Remove { result } = ret { Ok(result) } else { @@ -171,15 +171,23 @@ impl WorkerHandle { /// trigger running the worker, will not block, and will run the worker parallelly /// /// will set the current timestamp to `now` for all dataflows before running them - pub fn run_available(&self, now: repr::Timestamp) { + pub async fn run_available(&self, now: repr::Timestamp) { self.itc_client - .blocking_lock() - .call_non_blocking(Request::RunAvail { now }); + .lock() + .await + .call_non_blocking(Request::RunAvail { now }) + .await; } - pub fn contains_flow(&self, task_id: FlowId) -> Result { + pub async fn contains_flow(&self, task_id: FlowId) -> Result { let req = Request::ContainTask { task_id }; - let ret = self.itc_client.blocking_lock().call_blocking(req).unwrap(); + let ret = self + .itc_client + .lock() + .await + .call_blocking(req) + .await + .unwrap(); if let Response::ContainTask { result: task_contain_result, } = ret @@ -197,10 +205,12 @@ impl WorkerHandle { } /// shutdown the worker - pub fn shutdown(&self) { + pub async fn shutdown(&self) { self.itc_client - .blocking_lock() - .call_non_blocking(Request::Shutdown); + .lock() + .await + .call_non_blocking(Request::Shutdown) + .await; } } @@ -384,24 +394,24 @@ struct InterThreadCallClient { impl InterThreadCallClient { /// call without expecting responses or blocking - fn call_non_blocking(&mut self, req: Request) { + async fn call_non_blocking(&mut self, req: Request) { let call_id = { - let mut call_id = self.call_id.blocking_lock(); + let mut call_id = self.call_id.lock().await; *call_id += 1; *call_id }; self.arg_sender.send((call_id, req)).unwrap(); } /// call blocking, and return the result - fn call_blocking(&mut self, req: Request) -> Result { + async fn call_blocking(&mut self, req: Request) -> Result { let call_id = { - let mut call_id = self.call_id.blocking_lock(); + let mut call_id = self.call_id.lock().await; *call_id += 1; *call_id }; self.arg_sender.send((call_id, req)).unwrap(); // TODO(discord9): better inter thread call impl - let (ret_call_id, ret) = self.ret_recv.blocking_recv().unwrap(); + let (ret_call_id, ret) = self.ret_recv.recv().await.unwrap(); if ret_call_id != call_id { return InternalSnafu { reason: "call id mismatch, worker/worker handler should be in sync", @@ -442,8 +452,8 @@ mod test { use crate::expr::Id; use crate::plan::Plan; use crate::repr::{RelationType, Row}; - #[test] - pub fn test_simple_get_with_worker_and_handle() { + #[tokio::test] + pub async fn test_simple_get_with_worker_and_handle() { let flow_tick = FlowTickManager::new(); let (tx, rx) = oneshot::channel(); let worker_thread_handle = std::thread::spawn(move || { @@ -451,7 +461,7 @@ mod test { tx.send(handle).unwrap(); worker.run(); }); - let handle = rx.blocking_recv().unwrap(); + let handle = rx.await.unwrap(); let src_ids = vec![GlobalId::User(1)]; let (tx, rx) = broadcast::channel::(1024); let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::(); @@ -474,11 +484,12 @@ mod test { vec![rx], true, ) + .await .unwrap(); tx.send((Row::empty(), 0, 0)).unwrap(); - handle.run_available(flow_tick.tick()); - assert_eq!(sink_rx.blocking_recv().unwrap().0, Row::empty()); - handle.shutdown(); + handle.run_available(flow_tick.tick()).await; + assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); + handle.shutdown().await; worker_thread_handle.join().unwrap(); } } diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index b302e0ae5d..718e826415 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; +use common_telemetry::info; use datatypes::data_type::ConcreteDataType as CDT; use prost::Message; use query::parser::QueryLanguageParser;