mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-16 02:02:56 +00:00
fix: make worker handle async
This commit is contained in:
@@ -381,7 +381,7 @@ impl FlownodeManager {
|
||||
pub async fn run_available(&self) {
|
||||
let now = self.tick_manager.tick();
|
||||
for worker in self.worker_handles.iter() {
|
||||
worker.lock().await.run_available(now);
|
||||
worker.lock().await.run_available(now).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,8 +513,8 @@ impl FlownodeManager {
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
for handle in self.worker_handles.iter() {
|
||||
let handle = handle.lock().await;
|
||||
if handle.contains_flow(flow_id)? {
|
||||
handle.remove_flow(flow_id)?;
|
||||
if handle.contains_flow(flow_id).await? {
|
||||
handle.remove_flow(flow_id).await?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -541,7 +541,7 @@ impl FlownodeManager {
|
||||
if create_if_not_exist {
|
||||
// check if the task already exists
|
||||
for handle in self.worker_handles.iter() {
|
||||
if handle.lock().await.contains_flow(flow_id)? {
|
||||
if handle.lock().await.contains_flow(flow_id).await? {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
@@ -581,15 +581,17 @@ impl FlownodeManager {
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let handle = &self.worker_handles[0].lock().await;
|
||||
handle.create_flow(
|
||||
flow_id,
|
||||
flow_plan,
|
||||
sink_id,
|
||||
sink_sender,
|
||||
&source_ids,
|
||||
source_senders,
|
||||
create_if_not_exist,
|
||||
)?;
|
||||
handle
|
||||
.create_flow(
|
||||
flow_id,
|
||||
flow_plan,
|
||||
sink_id,
|
||||
sink_sender,
|
||||
&source_ids,
|
||||
source_senders,
|
||||
create_if_not_exist,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Some(flow_id))
|
||||
}
|
||||
@@ -780,8 +782,8 @@ impl FlowNodeContext {
|
||||
} else {
|
||||
let global_id = self.new_global_id();
|
||||
if let Some(table_id) = table_id {
|
||||
let schema = srv_map.get_table_schema(&table_id).await.unwrap();
|
||||
self.schema.insert(global_id, schema);
|
||||
let schema = srv_map.get_table_schema(&table_id).await;
|
||||
let _ = schema.map(|schema| self.schema.insert(global_id, schema));
|
||||
}
|
||||
|
||||
self.table_repr.insert(table_name, table_id, global_id);
|
||||
|
||||
@@ -16,7 +16,8 @@
|
||||
|
||||
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_meta::error::{Result, UnexpectedSnafu};
|
||||
use common_error::ext::{BoxedError, ErrorExt, StackError};
|
||||
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
|
||||
use common_meta::node_manager::Flownode;
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
@@ -24,11 +25,10 @@ use snafu::ResultExt;
|
||||
use crate::adapter::FlownodeManager;
|
||||
use crate::repr::{self, DiffRow};
|
||||
|
||||
fn to_meta_err(err: impl ToString) -> common_meta::error::Error {
|
||||
UnexpectedSnafu {
|
||||
err_msg: err.to_string(),
|
||||
}
|
||||
.build()
|
||||
fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error {
|
||||
Err::<(), _>(BoxedError::new(err))
|
||||
.with_context(|_| ExternalSnafu)
|
||||
.unwrap_err()
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -116,7 +116,7 @@ impl WorkerHandle {
|
||||
/// create task, return task id
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn create_flow(
|
||||
pub async fn create_flow(
|
||||
&self,
|
||||
task_id: FlowId,
|
||||
plan: TypedPlan,
|
||||
@@ -136,7 +136,7 @@ impl WorkerHandle {
|
||||
create_if_not_exist,
|
||||
};
|
||||
|
||||
let ret = self.itc_client.blocking_lock().call_blocking(req)?;
|
||||
let ret = self.itc_client.lock().await.call_blocking(req).await?;
|
||||
if let Response::Create {
|
||||
result: task_create_result,
|
||||
} = ret
|
||||
@@ -154,9 +154,9 @@ impl WorkerHandle {
|
||||
}
|
||||
|
||||
/// remove task, return task id
|
||||
pub fn remove_flow(&self, task_id: FlowId) -> Result<bool, Error> {
|
||||
pub async fn remove_flow(&self, task_id: FlowId) -> Result<bool, Error> {
|
||||
let req = Request::Remove { task_id };
|
||||
let ret = self.itc_client.blocking_lock().call_blocking(req)?;
|
||||
let ret = self.itc_client.lock().await.call_blocking(req).await?;
|
||||
if let Response::Remove { result } = ret {
|
||||
Ok(result)
|
||||
} else {
|
||||
@@ -171,15 +171,23 @@ impl WorkerHandle {
|
||||
/// trigger running the worker, will not block, and will run the worker parallelly
|
||||
///
|
||||
/// will set the current timestamp to `now` for all dataflows before running them
|
||||
pub fn run_available(&self, now: repr::Timestamp) {
|
||||
pub async fn run_available(&self, now: repr::Timestamp) {
|
||||
self.itc_client
|
||||
.blocking_lock()
|
||||
.call_non_blocking(Request::RunAvail { now });
|
||||
.lock()
|
||||
.await
|
||||
.call_non_blocking(Request::RunAvail { now })
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn contains_flow(&self, task_id: FlowId) -> Result<bool, Error> {
|
||||
pub async fn contains_flow(&self, task_id: FlowId) -> Result<bool, Error> {
|
||||
let req = Request::ContainTask { task_id };
|
||||
let ret = self.itc_client.blocking_lock().call_blocking(req).unwrap();
|
||||
let ret = self
|
||||
.itc_client
|
||||
.lock()
|
||||
.await
|
||||
.call_blocking(req)
|
||||
.await
|
||||
.unwrap();
|
||||
if let Response::ContainTask {
|
||||
result: task_contain_result,
|
||||
} = ret
|
||||
@@ -197,10 +205,12 @@ impl WorkerHandle {
|
||||
}
|
||||
|
||||
/// shutdown the worker
|
||||
pub fn shutdown(&self) {
|
||||
pub async fn shutdown(&self) {
|
||||
self.itc_client
|
||||
.blocking_lock()
|
||||
.call_non_blocking(Request::Shutdown);
|
||||
.lock()
|
||||
.await
|
||||
.call_non_blocking(Request::Shutdown)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -384,24 +394,24 @@ struct InterThreadCallClient {
|
||||
|
||||
impl InterThreadCallClient {
|
||||
/// call without expecting responses or blocking
|
||||
fn call_non_blocking(&mut self, req: Request) {
|
||||
async fn call_non_blocking(&mut self, req: Request) {
|
||||
let call_id = {
|
||||
let mut call_id = self.call_id.blocking_lock();
|
||||
let mut call_id = self.call_id.lock().await;
|
||||
*call_id += 1;
|
||||
*call_id
|
||||
};
|
||||
self.arg_sender.send((call_id, req)).unwrap();
|
||||
}
|
||||
/// call blocking, and return the result
|
||||
fn call_blocking(&mut self, req: Request) -> Result<Response, Error> {
|
||||
async fn call_blocking(&mut self, req: Request) -> Result<Response, Error> {
|
||||
let call_id = {
|
||||
let mut call_id = self.call_id.blocking_lock();
|
||||
let mut call_id = self.call_id.lock().await;
|
||||
*call_id += 1;
|
||||
*call_id
|
||||
};
|
||||
self.arg_sender.send((call_id, req)).unwrap();
|
||||
// TODO(discord9): better inter thread call impl
|
||||
let (ret_call_id, ret) = self.ret_recv.blocking_recv().unwrap();
|
||||
let (ret_call_id, ret) = self.ret_recv.recv().await.unwrap();
|
||||
if ret_call_id != call_id {
|
||||
return InternalSnafu {
|
||||
reason: "call id mismatch, worker/worker handler should be in sync",
|
||||
@@ -442,8 +452,8 @@ mod test {
|
||||
use crate::expr::Id;
|
||||
use crate::plan::Plan;
|
||||
use crate::repr::{RelationType, Row};
|
||||
#[test]
|
||||
pub fn test_simple_get_with_worker_and_handle() {
|
||||
#[tokio::test]
|
||||
pub async fn test_simple_get_with_worker_and_handle() {
|
||||
let flow_tick = FlowTickManager::new();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let worker_thread_handle = std::thread::spawn(move || {
|
||||
@@ -451,7 +461,7 @@ mod test {
|
||||
tx.send(handle).unwrap();
|
||||
worker.run();
|
||||
});
|
||||
let handle = rx.blocking_recv().unwrap();
|
||||
let handle = rx.await.unwrap();
|
||||
let src_ids = vec![GlobalId::User(1)];
|
||||
let (tx, rx) = broadcast::channel::<DiffRow>(1024);
|
||||
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<DiffRow>();
|
||||
@@ -474,11 +484,12 @@ mod test {
|
||||
vec![rx],
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
tx.send((Row::empty(), 0, 0)).unwrap();
|
||||
handle.run_available(flow_tick.tick());
|
||||
assert_eq!(sink_rx.blocking_recv().unwrap().0, Row::empty());
|
||||
handle.shutdown();
|
||||
handle.run_available(flow_tick.tick()).await;
|
||||
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
|
||||
handle.shutdown().await;
|
||||
worker_thread_handle.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use datatypes::data_type::ConcreteDataType as CDT;
|
||||
use prost::Message;
|
||||
use query::parser::QueryLanguageParser;
|
||||
|
||||
Reference in New Issue
Block a user