chore: add wait_initialized method for frontend client (#7414)

* chore: add wait_initialized method for frontend client

* fix: some

* fix: cargo fmt

* add comment

* add unit test

* rename

* fix: cargo check

* fix: cr by copilot
This commit is contained in:
fys
2025-12-17 16:13:36 +08:00
committed by GitHub
parent 1afcddd5a9
commit 0bc5a305be
3 changed files with 110 additions and 27 deletions

View File

@@ -552,9 +552,8 @@ impl StartCommand {
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>; let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler); let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler frontend_instance_handler
.lock() .set_handler(weak_grpc_handler)
.unwrap() .await;
.replace(weak_grpc_handler);
// set the frontend invoker for flownode // set the frontend invoker for flownode
let flow_streaming_engine = flownode.flow_engine().streaming_engine(); let flow_streaming_engine = flownode.flow_engine().streaming_engine();

View File

@@ -15,7 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::time::SystemTime; use std::time::SystemTime;
use api::v1::greptime_request::Request; use api::v1::greptime_request::Request;
@@ -38,6 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef}; use session::context::{QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT; use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use tokio::sync::SetOnce;
use crate::batching_mode::BatchingModeOptions; use crate::batching_mode::BatchingModeOptions;
use crate::error::{ use crate::error::{
@@ -75,7 +76,19 @@ impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send
} }
} }
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>; #[derive(Debug, Clone)]
pub struct HandlerMutable {
handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
is_initialized: Arc<SetOnce<()>>,
}
impl HandlerMutable {
pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
*self.handler.lock().unwrap() = Some(handler);
// Ignore the error, as we allow the handler to be set multiple times.
let _ = self.is_initialized.set(());
}
}
/// A simple frontend client able to execute sql using grpc protocol /// A simple frontend client able to execute sql using grpc protocol
/// ///
@@ -100,7 +113,11 @@ pub enum FrontendClient {
impl FrontendClient { impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) { pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None)); let is_initialized = Arc::new(SetOnce::new());
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(None)),
is_initialized,
};
( (
Self::Standalone { Self::Standalone {
database_client: handler.clone(), database_client: handler.clone(),
@@ -110,23 +127,13 @@ impl FrontendClient {
) )
} }
/// Check if the frontend client is initialized. /// Waits until the frontend client is initialized.
/// pub async fn wait_initialized(&self) {
/// In distributed mode, it is always initialized. if let FrontendClient::Standalone {
/// In standalone mode, it checks if the database client is set. database_client, ..
pub fn is_initialized(&self) -> bool { } = self
match self { {
FrontendClient::Distributed { .. } => true, database_client.is_initialized.wait().await;
FrontendClient::Standalone {
database_client, ..
} => {
let guard = database_client.lock();
if let Ok(guard) = guard {
guard.is_some()
} else {
false
}
}
} }
} }
@@ -158,8 +165,14 @@ impl FrontendClient {
grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>, grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
query: QueryOptions, query: QueryOptions,
) -> Self { ) -> Self {
let is_initialized = Arc::new(SetOnce::new_with(Some(())));
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(Some(grpc_handler))),
is_initialized: is_initialized.clone(),
};
Self::Standalone { Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))), database_client: handler,
query, query,
} }
} }
@@ -341,6 +354,7 @@ impl FrontendClient {
{ {
let database_client = { let database_client = {
database_client database_client
.handler
.lock() .lock()
.map_err(|e| { .map_err(|e| {
UnexpectedSnafu { UnexpectedSnafu {
@@ -418,6 +432,7 @@ impl FrontendClient {
{ {
let database_client = { let database_client = {
database_client database_client
.handler
.lock() .lock()
.map_err(|e| { .map_err(|e| {
UnexpectedSnafu { UnexpectedSnafu {
@@ -480,3 +495,73 @@ impl std::fmt::Display for PeerDesc {
} }
} }
} }
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_query::Output;
use tokio::time::timeout;
use super::*;
#[derive(Debug)]
struct NoopHandler;
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
&self,
_query: Request,
_ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
Ok(Output::new_with_affected_rows(0))
}
}
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
assert!(
timeout(Duration::from_millis(50), client.wait_initialized())
.await
.is_err()
);
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
handler_mut.set_handler(Arc::downgrade(&handler)).await;
timeout(Duration::from_secs(1), client.wait_initialized())
.await
.expect("wait_initialized should complete after handler is set");
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.expect("wait_initialized should be a no-op once initialized");
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
let meta_client = Arc::new(MetaClient::default());
let client = FrontendClient::from_meta_client(
meta_client,
None,
QueryOptions::default(),
BatchingModeOptions::default(),
)
.unwrap();
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
}
}

View File

@@ -259,9 +259,8 @@ impl GreptimeDbStandaloneBuilder {
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>; let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler); let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler frontend_instance_handler
.lock() .set_handler(weak_grpc_handler)
.unwrap() .await;
.replace(weak_grpc_handler);
let flow_streaming_engine = flownode.flow_engine().streaming_engine(); let flow_streaming_engine = flownode.flow_engine().streaming_engine();
let invoker = flow::FrontendInvoker::build_from( let invoker = flow::FrontendInvoker::build_from(