From 0bc5a305be70cd9eef87a7c64f7b70aa43c26d41 Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Wed, 17 Dec 2025 16:13:36 +0800 Subject: [PATCH] chore: add wait_initialized method for frontend client (#7414) * chore: add wait_initialized method for frontend client * fix: some * fix: cargo fmt * add comment * add unit test * rename * fix: cargo check * fix: cr by copilot --- src/cmd/src/standalone.rs | 5 +- src/flow/src/batching_mode/frontend_client.rs | 127 +++++++++++++++--- tests-integration/src/standalone.rs | 5 +- 3 files changed, 110 insertions(+), 27 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 012680ac08..dc00e4a20a 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -552,9 +552,8 @@ impl StartCommand { let grpc_handler = fe_instance.clone() as Arc; let weak_grpc_handler = Arc::downgrade(&grpc_handler); frontend_instance_handler - .lock() - .unwrap() - .replace(weak_grpc_handler); + .set_handler(weak_grpc_handler) + .await; // set the frontend invoker for flownode let flow_streaming_engine = flownode.flow_engine().streaming_engine(); diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 174fa25671..d07c92efb2 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -15,7 +15,7 @@ //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user use std::collections::HashMap; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, Mutex, Weak}; use std::time::SystemTime; use api::v1::greptime_request::Request; @@ -38,6 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use session::hints::READ_PREFERENCE_HINT; use snafu::{OptionExt, ResultExt}; +use tokio::sync::SetOnce; use crate::batching_mode::BatchingModeOptions; use crate::error::{ @@ -75,7 +76,19 @@ impl + Send } } -type HandlerMutable = Arc>>>; +#[derive(Debug, Clone)] +pub struct HandlerMutable { + handler: Arc>>>, + is_initialized: Arc>, +} + +impl HandlerMutable { + pub async fn set_handler(&self, handler: Weak) { + *self.handler.lock().unwrap() = Some(handler); + // Ignore the error, as we allow the handler to be set multiple times. + let _ = self.is_initialized.set(()); + } +} /// A simple frontend client able to execute sql using grpc protocol /// @@ -100,7 +113,11 @@ pub enum FrontendClient { impl FrontendClient { /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) { - let handler = Arc::new(std::sync::Mutex::new(None)); + let is_initialized = Arc::new(SetOnce::new()); + let handler = HandlerMutable { + handler: Arc::new(Mutex::new(None)), + is_initialized, + }; ( Self::Standalone { database_client: handler.clone(), @@ -110,23 +127,13 @@ impl FrontendClient { ) } - /// Check if the frontend client is initialized. - /// - /// In distributed mode, it is always initialized. - /// In standalone mode, it checks if the database client is set. - pub fn is_initialized(&self) -> bool { - match self { - FrontendClient::Distributed { .. } => true, - FrontendClient::Standalone { - database_client, .. - } => { - let guard = database_client.lock(); - if let Ok(guard) = guard { - guard.is_some() - } else { - false - } - } + /// Waits until the frontend client is initialized. + pub async fn wait_initialized(&self) { + if let FrontendClient::Standalone { + database_client, .. + } = self + { + database_client.is_initialized.wait().await; } } @@ -158,8 +165,14 @@ impl FrontendClient { grpc_handler: Weak, query: QueryOptions, ) -> Self { + let is_initialized = Arc::new(SetOnce::new_with(Some(()))); + let handler = HandlerMutable { + handler: Arc::new(Mutex::new(Some(grpc_handler))), + is_initialized: is_initialized.clone(), + }; + Self::Standalone { - database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))), + database_client: handler, query, } } @@ -341,6 +354,7 @@ impl FrontendClient { { let database_client = { database_client + .handler .lock() .map_err(|e| { UnexpectedSnafu { @@ -418,6 +432,7 @@ impl FrontendClient { { let database_client = { database_client + .handler .lock() .map_err(|e| { UnexpectedSnafu { @@ -480,3 +495,73 @@ impl std::fmt::Display for PeerDesc { } } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use common_query::Output; + use tokio::time::timeout; + + use super::*; + + #[derive(Debug)] + struct NoopHandler; + + #[async_trait::async_trait] + impl GrpcQueryHandlerWithBoxedError for NoopHandler { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { + Ok(Output::new_with_affected_rows(0)) + } + } + + #[tokio::test] + async fn wait_initialized() { + let (client, handler_mut) = + FrontendClient::from_empty_grpc_handler(QueryOptions::default()); + + assert!( + timeout(Duration::from_millis(50), client.wait_initialized()) + .await + .is_err() + ); + + let handler: Arc = Arc::new(NoopHandler); + handler_mut.set_handler(Arc::downgrade(&handler)).await; + + timeout(Duration::from_secs(1), client.wait_initialized()) + .await + .expect("wait_initialized should complete after handler is set"); + + timeout(Duration::from_millis(10), client.wait_initialized()) + .await + .expect("wait_initialized should be a no-op once initialized"); + + let handler: Arc = Arc::new(NoopHandler); + let client = + FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); + assert!( + timeout(Duration::from_millis(10), client.wait_initialized()) + .await + .is_ok() + ); + + let meta_client = Arc::new(MetaClient::default()); + let client = FrontendClient::from_meta_client( + meta_client, + None, + QueryOptions::default(), + BatchingModeOptions::default(), + ) + .unwrap(); + assert!( + timeout(Duration::from_millis(10), client.wait_initialized()) + .await + .is_ok() + ); + } +} diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b43c000189..f50bf67e33 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -259,9 +259,8 @@ impl GreptimeDbStandaloneBuilder { let grpc_handler = instance.clone() as Arc; let weak_grpc_handler = Arc::downgrade(&grpc_handler); frontend_instance_handler - .lock() - .unwrap() - .replace(weak_grpc_handler); + .set_handler(weak_grpc_handler) + .await; let flow_streaming_engine = flownode.flow_engine().streaming_engine(); let invoker = flow::FrontendInvoker::build_from(