diff --git a/src/catalog/src/process_manager.rs b/src/catalog/src/process_manager.rs index ff2db26f46..9ee6744323 100644 --- a/src/catalog/src/process_manager.rs +++ b/src/catalog/src/process_manager.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock}; use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo}; use common_base::cancellation::CancellationHandle; use common_frontend::selector::{FrontendSelector, MetaClientSelector}; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, info, warn}; use common_time::util::current_time_millis; use meta_client::MetaClientRef; use snafu::{ensure, OptionExt, ResultExt}; @@ -141,14 +141,20 @@ impl ProcessManager { .await .context(error::InvokeFrontendSnafu)?; for mut f in frontends { - processes.extend( - f.list_process(ListProcessRequest { + let result = f + .list_process(ListProcessRequest { catalog: catalog.unwrap_or_default().to_string(), }) .await - .context(error::InvokeFrontendSnafu)? - .processes, - ); + .context(error::InvokeFrontendSnafu); + match result { + Ok(resp) => { + processes.extend(resp.processes); + } + Err(e) => { + warn!(e; "Skipping failing node: {:?}", f) + } + } } } processes.extend(self.local_processes(catalog)?); diff --git a/src/common/frontend/src/selector.rs b/src/common/frontend/src/selector.rs index 3536ec85d8..e70f622fa0 100644 --- a/src/common/frontend/src/selector.rs +++ b/src/common/frontend/src/selector.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -30,7 +31,7 @@ use crate::error::{MetaSnafu, Result}; pub type FrontendClientPtr = Box; #[async_trait::async_trait] -pub trait FrontendClient: Send { +pub trait FrontendClient: Send + Debug { async fn list_process(&mut self, req: ListProcessRequest) -> Result; async fn kill_process(&mut self, req: KillProcessRequest) -> Result;