diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 72fdaab956..fd3857a918 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -55,14 +55,32 @@ type FlownodeOptions = GreptimeOptions; pub struct Instance { flownode: FlownodeInstance, + // The components of flownode, which make it easier to expand based + // on the components. + #[cfg(feature = "enterprise")] + components: Components, + // Keep the logging guard to prevent the worker from being dropped. _guard: Vec, } +#[cfg(feature = "enterprise")] +pub struct Components { + pub catalog_manager: catalog::CatalogManagerRef, + pub fe_client: Arc, + pub kv_backend: common_meta::kv_backend::KvBackendRef, +} + impl Instance { - pub fn new(flownode: FlownodeInstance, guard: Vec) -> Self { + pub fn new( + flownode: FlownodeInstance, + #[cfg(feature = "enterprise")] components: Components, + guard: Vec, + ) -> Self { Self { flownode, + #[cfg(feature = "enterprise")] + components, _guard: guard, } } @@ -75,6 +93,11 @@ impl Instance { pub fn flownode_mut(&mut self) -> &mut FlownodeInstance { &mut self.flownode } + + #[cfg(feature = "enterprise")] + pub fn components(&self) -> &Components { + &self.components + } } #[async_trait::async_trait] @@ -350,13 +373,14 @@ impl StartCommand { let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?; let frontend_client = FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header); + let frontend_client = Arc::new(frontend_client); let flownode_builder = FlownodeBuilder::new( opts.clone(), plugins, table_metadata_manager, catalog_manager.clone(), flow_metadata_manager, - Arc::new(frontend_client), + frontend_client.clone(), ) .with_heartbeat_task(heartbeat_task); @@ -394,6 +418,16 @@ impl StartCommand { .set_frontend_invoker(invoker) .await; - Ok(Instance::new(flownode, guard)) + #[cfg(feature = "enterprise")] + let components = Components { + catalog_manager: catalog_manager.clone(), + fe_client: frontend_client, + kv_backend: cached_meta_backend, + }; + + #[cfg(not(feature = "enterprise"))] + return Ok(Instance::new(flownode, guard)); + #[cfg(feature = "enterprise")] + Ok(Instance::new(flownode, components, guard)) } } diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 0b7a1cbcc3..e70a58ebec 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -18,7 +18,8 @@ use std::sync::{Arc, Weak}; use std::time::SystemTime; use api::v1::greptime_request::Request; -use api::v1::CreateTableExpr; +use api::v1::query_request::Query; +use api::v1::{CreateTableExpr, QueryRequest}; use client::{Client, Database}; use common_error::ext::{BoxedError, ErrorExt}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -269,6 +270,55 @@ impl FrontendClient { .await } + /// Execute a SQL statement on the frontend. + pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result { + match self { + FrontendClient::Distributed { .. } => { + let db = self.get_random_active_frontend(catalog, schema).await?; + db.database + .sql(sql) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + FrontendClient::Standalone { database_client } => { + let ctx = QueryContextBuilder::default() + .current_catalog(catalog.to_string()) + .current_schema(schema.to_string()) + .build(); + let ctx = Arc::new(ctx); + { + let database_client = { + database_client + .lock() + .map_err(|e| { + UnexpectedSnafu { + reason: format!("Failed to lock database client: {e}"), + } + .build() + })? + .as_ref() + .context(UnexpectedSnafu { + reason: "Standalone's frontend instance is not set", + })? + .upgrade() + .context(UnexpectedSnafu { + reason: "Failed to upgrade database client", + })? + }; + let req = Request::Query(QueryRequest { + query: Some(Query::Sql(sql.to_string())), + }); + database_client + .do_query(req, ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + } + } + } + /// Handle a request to frontend pub(crate) async fn handle( &self, @@ -318,7 +368,7 @@ impl FrontendClient { })? }; let resp: common_query::Output = database_client - .do_query(req.clone(), ctx) + .do_query(req, ctx) .await .map_err(BoxedError::new) .context(ExternalSnafu)?;