feat: flow's http server (#5372)

* feat: flow's http server

* feat: add cli options for http addr

* test: sqlness runner http addr

* feat: metrics

* chore: also shutdown http server
This commit is contained in:
discord9
2025-01-16 15:25:30 +08:00
committed by GitHub
parent a4761d6245
commit 317fe9eaa5
6 changed files with 57 additions and 0 deletions

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
@@ -142,6 +143,11 @@ struct StartCommand {
/// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
#[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
env_prefix: String,
#[clap(long)]
http_addr: Option<String>,
/// HTTP request timeout in seconds.
#[clap(long)]
http_timeout: Option<u64>,
}
impl StartCommand {
@@ -198,6 +204,14 @@ impl StartCommand {
opts.mode = Mode::Distributed;
}
if let Some(http_addr) = &self.http_addr {
opts.http.addr.clone_from(http_addr);
}
if let Some(http_timeout) = self.http_timeout {
opts.http.timeout = Duration::from_secs(http_timeout);
}
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
return MissingConfigSnafu {
msg: "Missing node id option",

View File

@@ -36,6 +36,7 @@ use query::QueryEngine;
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
@@ -106,6 +107,7 @@ pub struct FlownodeOptions {
pub node_id: Option<u64>,
pub flow: FlowConfig,
pub grpc: GrpcOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,
pub logging: LoggingOptions,
pub tracing: TracingOptions,
@@ -120,6 +122,7 @@ impl Default for FlownodeOptions {
node_id: None,
flow: FlowConfig::default(),
grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"),
http: HttpOptions::default(),
meta_client: None,
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),

View File

@@ -39,6 +39,8 @@ use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory};
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
@@ -210,6 +212,9 @@ impl servers::server::Server for FlownodeServer {
pub struct FlownodeInstance {
server: FlownodeServer,
addr: SocketAddr,
/// only used for health check
http_server: HttpServer,
http_addr: SocketAddr,
heartbeat_task: Option<HeartbeatTask>,
}
@@ -224,6 +229,12 @@ impl FlownodeInstance {
.start(self.addr)
.await
.context(StartServerSnafu)?;
self.http_server
.start(self.http_addr)
.await
.context(StartServerSnafu)?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), crate::Error> {
@@ -233,6 +244,11 @@ impl FlownodeInstance {
task.shutdown();
}
self.http_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
@@ -305,12 +321,21 @@ impl FlownodeBuilder {
let server = FlownodeServer::new(FlowService::new(manager.clone()));
let http_addr = self.opts.http.addr.parse().context(ParseAddrSnafu {
addr: self.opts.http.addr.clone(),
})?;
let http_server = HttpServerBuilder::new(self.opts.http)
.with_metrics_handler(MetricsHandler)
.build();
let heartbeat_task = self.heartbeat_task;
let addr = self.opts.grpc.addr;
let instance = FlownodeInstance {
server,
addr: addr.parse().context(ParseAddrSnafu { addr })?,
http_server,
http_addr,
heartbeat_task,
};
Ok(instance)