feat: support execute sql in frontend_client (#6355)

* feat: support execute sql in frontend_client

* chore: remove unnecessary clone

* add components for flownode instance

* add feature gate for component

* fix: enterprise feature
This commit is contained in:
fys
2025-06-19 17:47:16 +08:00
committed by GitHub
parent d9faa5c801
commit d4826b998d
2 changed files with 89 additions and 5 deletions

View File

@@ -55,14 +55,32 @@ type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
pub struct Instance {
flownode: FlownodeInstance,
// The components of flownode, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub catalog_manager: catalog::CatalogManagerRef,
pub fe_client: Arc<FrontendClient>,
pub kv_backend: common_meta::kv_backend::KvBackendRef,
}
impl Instance {
pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
pub fn new(
flownode: FlownodeInstance,
#[cfg(feature = "enterprise")] components: Components,
guard: Vec<WorkerGuard>,
) -> Self {
Self {
flownode,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
}
}
@@ -75,6 +93,11 @@ impl Instance {
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait::async_trait]
@@ -350,13 +373,14 @@ impl StartCommand {
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
let frontend_client =
FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
plugins,
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
Arc::new(frontend_client),
frontend_client.clone(),
)
.with_heartbeat_task(heartbeat_task);
@@ -394,6 +418,16 @@ impl StartCommand {
.set_frontend_invoker(invoker)
.await;
Ok(Instance::new(flownode, guard))
#[cfg(feature = "enterprise")]
let components = Components {
catalog_manager: catalog_manager.clone(),
fe_client: frontend_client,
kv_backend: cached_meta_backend,
};
#[cfg(not(feature = "enterprise"))]
return Ok(Instance::new(flownode, guard));
#[cfg(feature = "enterprise")]
Ok(Instance::new(flownode, components, guard))
}
}

View File

@@ -18,7 +18,8 @@ use std::sync::{Arc, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
use api::v1::query_request::Query;
use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database};
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -269,6 +270,55 @@ impl FrontendClient {
.await
}
/// Execute a SQL statement on the frontend.
pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
db.database
.sql(sql)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
FrontendClient::Standalone { database_client } => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build();
let ctx = Arc::new(ctx);
{
let database_client = {
database_client
.lock()
.map_err(|e| {
UnexpectedSnafu {
reason: format!("Failed to lock database client: {e}"),
}
.build()
})?
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",
})?
.upgrade()
.context(UnexpectedSnafu {
reason: "Failed to upgrade database client",
})?
};
let req = Request::Query(QueryRequest {
query: Some(Query::Sql(sql.to_string())),
});
database_client
.do_query(req, ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
}
}
}
/// Handle a request to frontend
pub(crate) async fn handle(
&self,
@@ -318,7 +368,7 @@ impl FrontendClient {
})?
};
let resp: common_query::Output = database_client
.do_query(req.clone(), ctx)
.do_query(req, ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;