1use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41use tokio::sync::SetOnce;
42
43use crate::batching_mode::BatchingModeOptions;
44use crate::error::{
45 CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
46 NoAvailableFrontendSnafu, UnexpectedSnafu,
47};
48use crate::{Error, FlowAuthHeader};
49
50#[async_trait::async_trait]
57pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
58 async fn do_query(
59 &self,
60 query: Request,
61 ctx: QueryContextRef,
62 ) -> std::result::Result<Output, BoxedError>;
63}
64
65#[async_trait::async_trait]
67impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send + Sync + 'static>
68 GrpcQueryHandlerWithBoxedError for T
69{
70 async fn do_query(
71 &self,
72 query: Request,
73 ctx: QueryContextRef,
74 ) -> std::result::Result<Output, BoxedError> {
75 self.do_query(query, ctx).await.map_err(BoxedError::new)
76 }
77}
78
79#[derive(Debug, Clone)]
80pub struct HandlerMutable {
81 handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
82 is_initialized: Arc<SetOnce<()>>,
83}
84
85impl HandlerMutable {
86 pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
87 *self.handler.lock().unwrap() = Some(handler);
88 let _ = self.is_initialized.set(());
90 }
91}
92
93#[derive(Debug, Clone)]
97pub enum FrontendClient {
98 Distributed {
99 meta_client: Arc<MetaClient>,
100 chnl_mgr: ChannelManager,
101 auth: Option<FlowAuthHeader>,
102 query: QueryOptions,
103 batch_opts: BatchingModeOptions,
104 },
105 Standalone {
106 database_client: HandlerMutable,
109 query: QueryOptions,
110 },
111}
112
113impl FrontendClient {
114 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
116 let is_initialized = Arc::new(SetOnce::new());
117 let handler = HandlerMutable {
118 handler: Arc::new(Mutex::new(None)),
119 is_initialized,
120 };
121 (
122 Self::Standalone {
123 database_client: handler.clone(),
124 query,
125 },
126 handler,
127 )
128 }
129
130 pub async fn wait_initialized(&self) {
132 if let FrontendClient::Standalone {
133 database_client, ..
134 } = self
135 {
136 database_client.is_initialized.wait().await;
137 }
138 }
139
140 pub fn from_meta_client(
141 meta_client: Arc<MetaClient>,
142 auth: Option<FlowAuthHeader>,
143 query: QueryOptions,
144 batch_opts: BatchingModeOptions,
145 ) -> Result<Self, Error> {
146 common_telemetry::info!("Frontend client build with auth={:?}", auth);
147 Ok(Self::Distributed {
148 meta_client,
149 chnl_mgr: {
150 let cfg = ChannelConfig::new()
151 .connect_timeout(batch_opts.grpc_conn_timeout)
152 .timeout(batch_opts.query_timeout);
153
154 let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
155 .context(InvalidClientConfigSnafu)?;
156 ChannelManager::with_config(cfg, tls_config)
157 },
158 auth,
159 query,
160 batch_opts,
161 })
162 }
163
164 pub fn from_grpc_handler(
165 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
166 query: QueryOptions,
167 ) -> Self {
168 let is_initialized = Arc::new(SetOnce::new_with(Some(())));
169 let handler = HandlerMutable {
170 handler: Arc::new(Mutex::new(Some(grpc_handler))),
171 is_initialized: is_initialized.clone(),
172 };
173
174 Self::Standalone {
175 database_client: handler,
176 query,
177 }
178 }
179}
180
181#[derive(Debug, Clone)]
182pub struct DatabaseWithPeer {
183 pub database: Database,
184 pub peer: Peer,
185}
186
187impl DatabaseWithPeer {
188 fn new(database: Database, peer: Peer) -> Self {
189 Self { database, peer }
190 }
191
192 async fn try_select_one(&self) -> Result<(), Error> {
194 let _ = self
196 .database
197 .sql("SELECT 1")
198 .await
199 .with_context(|_| InvalidRequestSnafu {
200 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
201 })?;
202 Ok(())
203 }
204}
205
206impl FrontendClient {
207 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
209 let Self::Distributed { meta_client, .. } = self else {
210 return Ok(vec![]);
211 };
212 let cluster_client = meta_client
213 .cluster_client()
214 .map_err(BoxedError::new)
215 .context(ExternalSnafu)?;
216
217 let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
218 let req = RangeRequest::new().with_prefix(prefix);
219 let resp = cluster_client
220 .range(req)
221 .await
222 .map_err(BoxedError::new)
223 .context(ExternalSnafu)?;
224 let mut res = Vec::with_capacity(resp.kvs.len());
225 for kv in resp.kvs {
226 let key = NodeInfoKey::try_from(kv.key)
227 .map_err(BoxedError::new)
228 .context(ExternalSnafu)?;
229
230 let val = NodeInfo::try_from(kv.value)
231 .map_err(BoxedError::new)
232 .context(ExternalSnafu)?;
233 res.push((key, val));
234 }
235 Ok(res)
236 }
237
238 async fn get_random_active_frontend(
241 &self,
242 catalog: &str,
243 schema: &str,
244 ) -> Result<DatabaseWithPeer, Error> {
245 let Self::Distributed {
246 meta_client: _,
247 chnl_mgr,
248 auth,
249 query: _,
250 batch_opts,
251 } = self
252 else {
253 return UnexpectedSnafu {
254 reason: "Expect distributed mode",
255 }
256 .fail();
257 };
258
259 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
260 interval.tick().await;
261 for retry in 0..batch_opts.experimental_grpc_max_retries {
262 let mut frontends = self.scan_for_frontend().await?;
263 let now_in_ms = SystemTime::now()
264 .duration_since(SystemTime::UNIX_EPOCH)
265 .unwrap()
266 .as_millis() as i64;
267 frontends.shuffle(&mut rng());
269
270 for (_, node_info) in frontends
272 .iter()
273 .filter(|(_, node_info)| {
275 node_info.last_activity_ts
276 + batch_opts
277 .experimental_frontend_activity_timeout
278 .as_millis() as i64
279 > now_in_ms
280 })
281 {
282 let addr = &node_info.peer.addr;
283 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
284 let database = {
285 let mut db = Database::new(catalog, schema, client);
286 if let Some(auth) = auth {
287 db.set_auth(auth.auth().clone());
288 }
289 db
290 };
291 let db = DatabaseWithPeer::new(database, node_info.peer.clone());
292 match db.try_select_one().await {
293 Ok(_) => return Ok(db),
294 Err(e) => {
295 warn!(
296 "Failed to connect to frontend {} on retry={}: \n{e:?}",
297 addr, retry
298 );
299 }
300 }
301 }
302 interval.tick().await;
305 }
306
307 NoAvailableFrontendSnafu {
308 timeout: batch_opts.grpc_conn_timeout,
309 context: "No available frontend found that is able to process query",
310 }
311 .fail()
312 }
313
314 pub async fn create(
315 &self,
316 create: CreateTableExpr,
317 catalog: &str,
318 schema: &str,
319 ) -> Result<u32, Error> {
320 self.handle(
321 Request::Ddl(api::v1::DdlRequest {
322 expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
323 }),
324 catalog,
325 schema,
326 &mut None,
327 )
328 .await
329 .map_err(BoxedError::new)
330 .with_context(|_| CreateSinkTableSnafu {
331 create: create.clone(),
332 })
333 }
334
335 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
337 match self {
338 FrontendClient::Distributed { .. } => {
339 let db = self.get_random_active_frontend(catalog, schema).await?;
340 db.database
341 .sql(sql)
342 .await
343 .map_err(BoxedError::new)
344 .context(ExternalSnafu)
345 }
346 FrontendClient::Standalone {
347 database_client, ..
348 } => {
349 let ctx = QueryContextBuilder::default()
350 .current_catalog(catalog.to_string())
351 .current_schema(schema.to_string())
352 .build();
353 let ctx = Arc::new(ctx);
354 {
355 let database_client = {
356 database_client
357 .handler
358 .lock()
359 .map_err(|e| {
360 UnexpectedSnafu {
361 reason: format!("Failed to lock database client: {e}"),
362 }
363 .build()
364 })?
365 .as_ref()
366 .context(UnexpectedSnafu {
367 reason: "Standalone's frontend instance is not set",
368 })?
369 .upgrade()
370 .context(UnexpectedSnafu {
371 reason: "Failed to upgrade database client",
372 })?
373 };
374 let req = Request::Query(QueryRequest {
375 query: Some(Query::Sql(sql.to_string())),
376 });
377 database_client
378 .do_query(req, ctx)
379 .await
380 .map_err(BoxedError::new)
381 .context(ExternalSnafu)
382 }
383 }
384 }
385 }
386
387 pub(crate) async fn handle(
389 &self,
390 req: api::v1::greptime_request::Request,
391 catalog: &str,
392 schema: &str,
393 peer_desc: &mut Option<PeerDesc>,
394 ) -> Result<u32, Error> {
395 match self {
396 FrontendClient::Distributed {
397 query, batch_opts, ..
398 } => {
399 let db = self.get_random_active_frontend(catalog, schema).await?;
400
401 *peer_desc = Some(PeerDesc::Dist {
402 peer: db.peer.clone(),
403 });
404
405 db.database
406 .handle_with_retry(
407 req.clone(),
408 batch_opts.experimental_grpc_max_retries,
409 &[
410 (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
411 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
412 ],
413 )
414 .await
415 .with_context(|_| InvalidRequestSnafu {
416 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
417 })
418 }
419 FrontendClient::Standalone {
420 database_client,
421 query,
422 } => {
423 let ctx = QueryContextBuilder::default()
424 .current_catalog(catalog.to_string())
425 .current_schema(schema.to_string())
426 .extensions(HashMap::from([(
427 QUERY_PARALLELISM_HINT.to_string(),
428 query.parallelism.to_string(),
429 )]))
430 .build();
431 let ctx = Arc::new(ctx);
432 {
433 let database_client = {
434 database_client
435 .handler
436 .lock()
437 .map_err(|e| {
438 UnexpectedSnafu {
439 reason: format!("Failed to lock database client: {e}"),
440 }
441 .build()
442 })?
443 .as_ref()
444 .context(UnexpectedSnafu {
445 reason: "Standalone's frontend instance is not set",
446 })?
447 .upgrade()
448 .context(UnexpectedSnafu {
449 reason: "Failed to upgrade database client",
450 })?
451 };
452 let resp: common_query::Output = database_client
453 .do_query(req, ctx)
454 .await
455 .map_err(BoxedError::new)
456 .context(ExternalSnafu)?;
457 match resp.data {
458 common_query::OutputData::AffectedRows(rows) => {
459 Ok(rows.try_into().map_err(|_| {
460 UnexpectedSnafu {
461 reason: format!("Failed to convert rows to u32: {}", rows),
462 }
463 .build()
464 })?)
465 }
466 _ => UnexpectedSnafu {
467 reason: "Unexpected output data",
468 }
469 .fail(),
470 }
471 }
472 }
473 }
474 }
475}
476
477#[derive(Debug, Default)]
479pub(crate) enum PeerDesc {
480 Dist {
482 peer: Peer,
484 },
485 #[default]
487 Standalone,
488}
489
490impl std::fmt::Display for PeerDesc {
491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492 match self {
493 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
494 PeerDesc::Standalone => write!(f, "standalone"),
495 }
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use std::time::Duration;
502
503 use common_query::Output;
504 use tokio::time::timeout;
505
506 use super::*;
507
508 #[derive(Debug)]
509 struct NoopHandler;
510
511 #[async_trait::async_trait]
512 impl GrpcQueryHandlerWithBoxedError for NoopHandler {
513 async fn do_query(
514 &self,
515 _query: Request,
516 _ctx: QueryContextRef,
517 ) -> std::result::Result<Output, BoxedError> {
518 Ok(Output::new_with_affected_rows(0))
519 }
520 }
521
522 #[tokio::test]
523 async fn wait_initialized() {
524 let (client, handler_mut) =
525 FrontendClient::from_empty_grpc_handler(QueryOptions::default());
526
527 assert!(
528 timeout(Duration::from_millis(50), client.wait_initialized())
529 .await
530 .is_err()
531 );
532
533 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
534 handler_mut.set_handler(Arc::downgrade(&handler)).await;
535
536 timeout(Duration::from_secs(1), client.wait_initialized())
537 .await
538 .expect("wait_initialized should complete after handler is set");
539
540 timeout(Duration::from_millis(10), client.wait_initialized())
541 .await
542 .expect("wait_initialized should be a no-op once initialized");
543
544 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
545 let client =
546 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
547 assert!(
548 timeout(Duration::from_millis(10), client.wait_initialized())
549 .await
550 .is_ok()
551 );
552
553 let meta_client = Arc::new(MetaClient::default());
554 let client = FrontendClient::from_meta_client(
555 meta_client,
556 None,
557 QueryOptions::default(),
558 BatchingModeOptions::default(),
559 )
560 .unwrap();
561 assert!(
562 timeout(Duration::from_millis(10), client.wait_initialized())
563 .await
564 .is_ok()
565 );
566 }
567}