flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41use tokio::sync::SetOnce;
42
43use crate::batching_mode::BatchingModeOptions;
44use crate::error::{
45    CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
46    NoAvailableFrontendSnafu, UnexpectedSnafu,
47};
48use crate::{Error, FlowAuthHeader};
49
50/// Just like [`GrpcQueryHandler`] but use BoxedError
51///
52/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
53///
54/// this is only useful for flownode to
55/// invoke frontend Instance in standalone mode
56#[async_trait::async_trait]
57pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
58    async fn do_query(
59        &self,
60        query: Request,
61        ctx: QueryContextRef,
62    ) -> std::result::Result<Output, BoxedError>;
63}
64
65/// auto impl
66#[async_trait::async_trait]
67impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send + Sync + 'static>
68    GrpcQueryHandlerWithBoxedError for T
69{
70    async fn do_query(
71        &self,
72        query: Request,
73        ctx: QueryContextRef,
74    ) -> std::result::Result<Output, BoxedError> {
75        self.do_query(query, ctx).await.map_err(BoxedError::new)
76    }
77}
78
79#[derive(Debug, Clone)]
80pub struct HandlerMutable {
81    handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
82    is_initialized: Arc<SetOnce<()>>,
83}
84
85impl HandlerMutable {
86    pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
87        *self.handler.lock().unwrap() = Some(handler);
88        // Ignore the error, as we allow the handler to be set multiple times.
89        let _ = self.is_initialized.set(());
90    }
91}
92
93/// A simple frontend client able to execute sql using grpc protocol
94///
95/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
96#[derive(Debug, Clone)]
97pub enum FrontendClient {
98    Distributed {
99        meta_client: Arc<MetaClient>,
100        chnl_mgr: ChannelManager,
101        auth: Option<FlowAuthHeader>,
102        query: QueryOptions,
103        batch_opts: BatchingModeOptions,
104    },
105    Standalone {
106        /// for the sake of simplicity still use grpc even in standalone mode
107        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
108        database_client: HandlerMutable,
109        query: QueryOptions,
110    },
111}
112
113impl FrontendClient {
114    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
115    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
116        let is_initialized = Arc::new(SetOnce::new());
117        let handler = HandlerMutable {
118            handler: Arc::new(Mutex::new(None)),
119            is_initialized,
120        };
121        (
122            Self::Standalone {
123                database_client: handler.clone(),
124                query,
125            },
126            handler,
127        )
128    }
129
130    /// Waits until the frontend client is initialized.
131    pub async fn wait_initialized(&self) {
132        if let FrontendClient::Standalone {
133            database_client, ..
134        } = self
135        {
136            database_client.is_initialized.wait().await;
137        }
138    }
139
140    pub fn from_meta_client(
141        meta_client: Arc<MetaClient>,
142        auth: Option<FlowAuthHeader>,
143        query: QueryOptions,
144        batch_opts: BatchingModeOptions,
145    ) -> Result<Self, Error> {
146        common_telemetry::info!("Frontend client build with auth={:?}", auth);
147        Ok(Self::Distributed {
148            meta_client,
149            chnl_mgr: {
150                let cfg = ChannelConfig::new()
151                    .connect_timeout(batch_opts.grpc_conn_timeout)
152                    .timeout(batch_opts.query_timeout);
153
154                let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
155                    .context(InvalidClientConfigSnafu)?;
156                ChannelManager::with_config(cfg, tls_config)
157            },
158            auth,
159            query,
160            batch_opts,
161        })
162    }
163
164    pub fn from_grpc_handler(
165        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
166        query: QueryOptions,
167    ) -> Self {
168        let is_initialized = Arc::new(SetOnce::new_with(Some(())));
169        let handler = HandlerMutable {
170            handler: Arc::new(Mutex::new(Some(grpc_handler))),
171            is_initialized: is_initialized.clone(),
172        };
173
174        Self::Standalone {
175            database_client: handler,
176            query,
177        }
178    }
179}
180
181#[derive(Debug, Clone)]
182pub struct DatabaseWithPeer {
183    pub database: Database,
184    pub peer: Peer,
185}
186
187impl DatabaseWithPeer {
188    fn new(database: Database, peer: Peer) -> Self {
189        Self { database, peer }
190    }
191
192    /// Try sending a "SELECT 1" to the database
193    async fn try_select_one(&self) -> Result<(), Error> {
194        // notice here use `sql` for `SELECT 1` return 1 row
195        let _ = self
196            .database
197            .sql("SELECT 1")
198            .await
199            .with_context(|_| InvalidRequestSnafu {
200                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
201            })?;
202        Ok(())
203    }
204}
205
206impl FrontendClient {
207    /// scan for available frontend from metadata
208    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
209        let Self::Distributed { meta_client, .. } = self else {
210            return Ok(vec![]);
211        };
212        let cluster_client = meta_client
213            .cluster_client()
214            .map_err(BoxedError::new)
215            .context(ExternalSnafu)?;
216
217        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
218        let req = RangeRequest::new().with_prefix(prefix);
219        let resp = cluster_client
220            .range(req)
221            .await
222            .map_err(BoxedError::new)
223            .context(ExternalSnafu)?;
224        let mut res = Vec::with_capacity(resp.kvs.len());
225        for kv in resp.kvs {
226            let key = NodeInfoKey::try_from(kv.key)
227                .map_err(BoxedError::new)
228                .context(ExternalSnafu)?;
229
230            let val = NodeInfo::try_from(kv.value)
231                .map_err(BoxedError::new)
232                .context(ExternalSnafu)?;
233            res.push((key, val));
234        }
235        Ok(res)
236    }
237
238    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
239    /// and is able to process query
240    async fn get_random_active_frontend(
241        &self,
242        catalog: &str,
243        schema: &str,
244    ) -> Result<DatabaseWithPeer, Error> {
245        let Self::Distributed {
246            meta_client: _,
247            chnl_mgr,
248            auth,
249            query: _,
250            batch_opts,
251        } = self
252        else {
253            return UnexpectedSnafu {
254                reason: "Expect distributed mode",
255            }
256            .fail();
257        };
258
259        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
260        interval.tick().await;
261        for retry in 0..batch_opts.experimental_grpc_max_retries {
262            let mut frontends = self.scan_for_frontend().await?;
263            let now_in_ms = SystemTime::now()
264                .duration_since(SystemTime::UNIX_EPOCH)
265                .unwrap()
266                .as_millis() as i64;
267            // shuffle the frontends to avoid always pick the same one
268            frontends.shuffle(&mut rng());
269
270            // found node with maximum last_activity_ts
271            for (_, node_info) in frontends
272                .iter()
273                // filter out frontend that have been down for more than 1 min
274                .filter(|(_, node_info)| {
275                    node_info.last_activity_ts
276                        + batch_opts
277                            .experimental_frontend_activity_timeout
278                            .as_millis() as i64
279                        > now_in_ms
280                })
281            {
282                let addr = &node_info.peer.addr;
283                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
284                let database = {
285                    let mut db = Database::new(catalog, schema, client);
286                    if let Some(auth) = auth {
287                        db.set_auth(auth.auth().clone());
288                    }
289                    db
290                };
291                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
292                match db.try_select_one().await {
293                    Ok(_) => return Ok(db),
294                    Err(e) => {
295                        warn!(
296                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
297                            addr, retry
298                        );
299                    }
300                }
301            }
302            // no available frontend
303            // sleep and retry
304            interval.tick().await;
305        }
306
307        NoAvailableFrontendSnafu {
308            timeout: batch_opts.grpc_conn_timeout,
309            context: "No available frontend found that is able to process query",
310        }
311        .fail()
312    }
313
314    pub async fn create(
315        &self,
316        create: CreateTableExpr,
317        catalog: &str,
318        schema: &str,
319    ) -> Result<u32, Error> {
320        self.handle(
321            Request::Ddl(api::v1::DdlRequest {
322                expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
323            }),
324            catalog,
325            schema,
326            &mut None,
327        )
328        .await
329        .map_err(BoxedError::new)
330        .with_context(|_| CreateSinkTableSnafu {
331            create: create.clone(),
332        })
333    }
334
335    /// Execute a SQL statement on the frontend.
336    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
337        match self {
338            FrontendClient::Distributed { .. } => {
339                let db = self.get_random_active_frontend(catalog, schema).await?;
340                db.database
341                    .sql(sql)
342                    .await
343                    .map_err(BoxedError::new)
344                    .context(ExternalSnafu)
345            }
346            FrontendClient::Standalone {
347                database_client, ..
348            } => {
349                let ctx = QueryContextBuilder::default()
350                    .current_catalog(catalog.to_string())
351                    .current_schema(schema.to_string())
352                    .build();
353                let ctx = Arc::new(ctx);
354                {
355                    let database_client = {
356                        database_client
357                            .handler
358                            .lock()
359                            .map_err(|e| {
360                                UnexpectedSnafu {
361                                    reason: format!("Failed to lock database client: {e}"),
362                                }
363                                .build()
364                            })?
365                            .as_ref()
366                            .context(UnexpectedSnafu {
367                                reason: "Standalone's frontend instance is not set",
368                            })?
369                            .upgrade()
370                            .context(UnexpectedSnafu {
371                                reason: "Failed to upgrade database client",
372                            })?
373                    };
374                    let req = Request::Query(QueryRequest {
375                        query: Some(Query::Sql(sql.to_string())),
376                    });
377                    database_client
378                        .do_query(req, ctx)
379                        .await
380                        .map_err(BoxedError::new)
381                        .context(ExternalSnafu)
382                }
383            }
384        }
385    }
386
387    /// Handle a request to frontend
388    pub(crate) async fn handle(
389        &self,
390        req: api::v1::greptime_request::Request,
391        catalog: &str,
392        schema: &str,
393        peer_desc: &mut Option<PeerDesc>,
394    ) -> Result<u32, Error> {
395        match self {
396            FrontendClient::Distributed {
397                query, batch_opts, ..
398            } => {
399                let db = self.get_random_active_frontend(catalog, schema).await?;
400
401                *peer_desc = Some(PeerDesc::Dist {
402                    peer: db.peer.clone(),
403                });
404
405                db.database
406                    .handle_with_retry(
407                        req.clone(),
408                        batch_opts.experimental_grpc_max_retries,
409                        &[
410                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
411                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
412                        ],
413                    )
414                    .await
415                    .with_context(|_| InvalidRequestSnafu {
416                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
417                    })
418            }
419            FrontendClient::Standalone {
420                database_client,
421                query,
422            } => {
423                let ctx = QueryContextBuilder::default()
424                    .current_catalog(catalog.to_string())
425                    .current_schema(schema.to_string())
426                    .extensions(HashMap::from([(
427                        QUERY_PARALLELISM_HINT.to_string(),
428                        query.parallelism.to_string(),
429                    )]))
430                    .build();
431                let ctx = Arc::new(ctx);
432                {
433                    let database_client = {
434                        database_client
435                            .handler
436                            .lock()
437                            .map_err(|e| {
438                                UnexpectedSnafu {
439                                    reason: format!("Failed to lock database client: {e}"),
440                                }
441                                .build()
442                            })?
443                            .as_ref()
444                            .context(UnexpectedSnafu {
445                                reason: "Standalone's frontend instance is not set",
446                            })?
447                            .upgrade()
448                            .context(UnexpectedSnafu {
449                                reason: "Failed to upgrade database client",
450                            })?
451                    };
452                    let resp: common_query::Output = database_client
453                        .do_query(req, ctx)
454                        .await
455                        .map_err(BoxedError::new)
456                        .context(ExternalSnafu)?;
457                    match resp.data {
458                        common_query::OutputData::AffectedRows(rows) => {
459                            Ok(rows.try_into().map_err(|_| {
460                                UnexpectedSnafu {
461                                    reason: format!("Failed to convert rows to u32: {}", rows),
462                                }
463                                .build()
464                            })?)
465                        }
466                        _ => UnexpectedSnafu {
467                            reason: "Unexpected output data",
468                        }
469                        .fail(),
470                    }
471                }
472            }
473        }
474    }
475}
476
477/// Describe a peer of frontend
478#[derive(Debug, Default)]
479pub(crate) enum PeerDesc {
480    /// Distributed mode's frontend peer address
481    Dist {
482        /// frontend peer address
483        peer: Peer,
484    },
485    /// Standalone mode
486    #[default]
487    Standalone,
488}
489
490impl std::fmt::Display for PeerDesc {
491    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492        match self {
493            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
494            PeerDesc::Standalone => write!(f, "standalone"),
495        }
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use std::time::Duration;
502
503    use common_query::Output;
504    use tokio::time::timeout;
505
506    use super::*;
507
508    #[derive(Debug)]
509    struct NoopHandler;
510
511    #[async_trait::async_trait]
512    impl GrpcQueryHandlerWithBoxedError for NoopHandler {
513        async fn do_query(
514            &self,
515            _query: Request,
516            _ctx: QueryContextRef,
517        ) -> std::result::Result<Output, BoxedError> {
518            Ok(Output::new_with_affected_rows(0))
519        }
520    }
521
522    #[tokio::test]
523    async fn wait_initialized() {
524        let (client, handler_mut) =
525            FrontendClient::from_empty_grpc_handler(QueryOptions::default());
526
527        assert!(
528            timeout(Duration::from_millis(50), client.wait_initialized())
529                .await
530                .is_err()
531        );
532
533        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
534        handler_mut.set_handler(Arc::downgrade(&handler)).await;
535
536        timeout(Duration::from_secs(1), client.wait_initialized())
537            .await
538            .expect("wait_initialized should complete after handler is set");
539
540        timeout(Duration::from_millis(10), client.wait_initialized())
541            .await
542            .expect("wait_initialized should be a no-op once initialized");
543
544        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
545        let client =
546            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
547        assert!(
548            timeout(Duration::from_millis(10), client.wait_initialized())
549                .await
550                .is_ok()
551        );
552
553        let meta_client = Arc::new(MetaClient::default());
554        let client = FrontendClient::from_meta_client(
555            meta_client,
556            None,
557            QueryOptions::default(),
558            BatchingModeOptions::default(),
559        )
560        .unwrap();
561        assert!(
562            timeout(Duration::from_millis(10), client.wait_initialized())
563                .await
564                .is_ok()
565        );
566    }
567}