mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
refactor: make http server built flexibly (#3225)
* refactor: make http server built flexibly * Apply suggestions from code review Co-authored-by: JeremyHi <jiachun_feng@proton.me> * fix: resolve PR comments * Fix CI. --------- Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -2659,9 +2659,6 @@ dependencies = [
|
||||
"async-compat",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"axum-macros",
|
||||
"axum-test-helper",
|
||||
"bytes",
|
||||
"catalog",
|
||||
"client",
|
||||
|
||||
@@ -73,6 +73,7 @@ arrow-flight = "47.0"
|
||||
arrow-schema = { version = "47.0", features = ["serde"] }
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
axum = { version = "0.6", features = ["headers"] }
|
||||
base64 = "0.21"
|
||||
bigdecimal = "0.4.2"
|
||||
bitflags = "2.4.1"
|
||||
|
||||
@@ -46,6 +46,10 @@ impl Instance {
|
||||
fn new(frontend: FeInstance) -> Self {
|
||||
Self { frontend }
|
||||
}
|
||||
|
||||
pub fn mut_inner(&mut self) -> &mut FeInstance {
|
||||
&mut self.frontend
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -253,13 +257,11 @@ impl StartCommand {
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let servers = Services::new(plugins)
|
||||
.build(opts.clone(), Arc::new(instance.clone()))
|
||||
.await
|
||||
let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
|
||||
.build()
|
||||
.context(StartFrontendSnafu)?;
|
||||
instance
|
||||
.build_servers(opts, servers)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
Ok(Instance::new(instance))
|
||||
|
||||
@@ -48,6 +48,8 @@ pub trait App {
|
||||
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
|
||||
let name = app.name().to_string();
|
||||
|
||||
app.pre_start()?;
|
||||
|
||||
tokio::select! {
|
||||
result = app.start() => {
|
||||
if let Err(err) = result {
|
||||
|
||||
@@ -437,13 +437,11 @@ impl StartCommand {
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let servers = Services::new(fe_plugins)
|
||||
.build(opts.clone(), Arc::new(frontend.clone()))
|
||||
.await
|
||||
let servers = Services::new(opts.clone(), Arc::new(frontend.clone()), fe_plugins)
|
||||
.build()
|
||||
.context(StartFrontendSnafu)?;
|
||||
frontend
|
||||
.build_servers(opts, servers)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
Ok(Instance {
|
||||
|
||||
@@ -13,8 +13,6 @@ arrow-flight.workspace = true
|
||||
async-compat = "0.2"
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
axum = "0.6"
|
||||
axum-macros = "0.3"
|
||||
bytes.workspace = true
|
||||
catalog.workspace = true
|
||||
client.workspace = true
|
||||
@@ -77,7 +75,6 @@ url = "2.3.1"
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
|
||||
client.workspace = true
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-query.workspace = true
|
||||
|
||||
@@ -32,6 +32,7 @@ use common_base::Plugins;
|
||||
use common_config::KvBackendConfig;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::state_store::KvStateStore;
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
@@ -119,6 +120,7 @@ pub struct Instance {
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
export_metrics_task: Option<ExportMetricsTask>,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
@@ -186,7 +188,7 @@ impl Instance {
|
||||
Ok((kv_backend, procedure_manager))
|
||||
}
|
||||
|
||||
pub async fn build_servers(
|
||||
pub fn build_servers(
|
||||
&mut self,
|
||||
opts: impl Into<FrontendOptions> + TomlSerializable,
|
||||
servers: ServerHandlers,
|
||||
@@ -219,6 +221,10 @@ impl Instance {
|
||||
pub fn statement_executor(&self) -> Arc<StatementExecutor> {
|
||||
self.statement_executor.clone()
|
||||
}
|
||||
|
||||
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
|
||||
&self.table_metadata_manager
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -20,6 +20,7 @@ use common_base::Plugins;
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
|
||||
use common_meta::datanode_manager::DatanodeManagerRef;
|
||||
use common_meta::ddl::DdlTaskExecutorRef;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use operator::delete::Deleter;
|
||||
use operator::insert::Inserter;
|
||||
@@ -127,7 +128,7 @@ impl FrontendBuilder {
|
||||
catalog_manager.clone(),
|
||||
query_engine.clone(),
|
||||
self.ddl_task_executor,
|
||||
kv_backend,
|
||||
kv_backend.clone(),
|
||||
catalog_manager.clone(),
|
||||
inserter.clone(),
|
||||
));
|
||||
@@ -145,6 +146,7 @@ impl FrontendBuilder {
|
||||
inserter,
|
||||
deleter,
|
||||
export_metrics_task: None,
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,8 +21,8 @@ use common_runtime::Builder as RuntimeBuilder;
|
||||
use servers::error::InternalIoSnafu;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::GrpcServerConfig;
|
||||
use servers::http::HttpServerBuilder;
|
||||
use servers::grpc::{GrpcServer, GrpcServerConfig};
|
||||
use servers::http::{HttpServer, HttpServerBuilder};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||
use servers::opentsdb::OpentsdbServer;
|
||||
@@ -37,16 +37,34 @@ use crate::frontend::{FrontendOptions, TomlSerializable};
|
||||
use crate::instance::FrontendInstance;
|
||||
use crate::service_config::GrpcOptions;
|
||||
|
||||
pub struct Services {
|
||||
pub struct Services<T, U>
|
||||
where
|
||||
T: Into<FrontendOptions> + TomlSerializable + Clone,
|
||||
U: FrontendInstance,
|
||||
{
|
||||
opts: T,
|
||||
instance: Arc<U>,
|
||||
grpc_server_builder: Option<GrpcServerBuilder>,
|
||||
http_server_builder: Option<HttpServerBuilder>,
|
||||
plugins: Plugins,
|
||||
}
|
||||
|
||||
impl Services {
|
||||
pub fn new(plugins: Plugins) -> Self {
|
||||
Self { plugins }
|
||||
impl<T, U> Services<T, U>
|
||||
where
|
||||
T: Into<FrontendOptions> + TomlSerializable + Clone,
|
||||
U: FrontendInstance,
|
||||
{
|
||||
pub fn new(opts: T, instance: Arc<U>, plugins: Plugins) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
instance,
|
||||
grpc_server_builder: None,
|
||||
http_server_builder: None,
|
||||
plugins,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn grpc_server_builder(opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
|
||||
pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
|
||||
let grpc_runtime = Arc::new(
|
||||
RuntimeBuilder::default()
|
||||
.worker_threads(opts.runtime_size)
|
||||
@@ -63,26 +81,93 @@ impl Services {
|
||||
Ok(GrpcServerBuilder::new(grpc_config, grpc_runtime))
|
||||
}
|
||||
|
||||
pub async fn build<T, U>(&self, opts: T, instance: Arc<U>) -> Result<ServerHandlers>
|
||||
where
|
||||
T: Into<FrontendOptions> + TomlSerializable + Clone,
|
||||
U: FrontendInstance,
|
||||
{
|
||||
let grpc_options = &opts.clone().into().grpc;
|
||||
let builder = Self::grpc_server_builder(grpc_options)?;
|
||||
self.build_with(opts, instance, builder).await
|
||||
pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
|
||||
let mut builder = HttpServerBuilder::new(opts.http.clone()).with_sql_handler(
|
||||
ServerSqlQueryHandlerAdapter::arc(self.instance.clone()),
|
||||
Some(self.instance.clone()),
|
||||
);
|
||||
|
||||
if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
|
||||
builder = builder.with_user_provider(user_provider);
|
||||
}
|
||||
|
||||
if opts.opentsdb.enable {
|
||||
builder = builder.with_opentsdb_handler(self.instance.clone());
|
||||
}
|
||||
|
||||
if opts.influxdb.enable {
|
||||
builder = builder.with_influxdb_handler(self.instance.clone());
|
||||
}
|
||||
|
||||
if opts.prom_store.enable {
|
||||
builder = builder
|
||||
.with_prom_handler(self.instance.clone(), opts.prom_store.with_metric_engine)
|
||||
.with_prometheus_handler(self.instance.clone());
|
||||
}
|
||||
|
||||
if opts.otlp.enable {
|
||||
builder = builder.with_otlp_handler(self.instance.clone());
|
||||
}
|
||||
builder
|
||||
}
|
||||
|
||||
pub async fn build_with<T, U>(
|
||||
&self,
|
||||
opts: T,
|
||||
instance: Arc<U>,
|
||||
builder: GrpcServerBuilder,
|
||||
) -> Result<ServerHandlers>
|
||||
where
|
||||
T: Into<FrontendOptions> + TomlSerializable,
|
||||
U: FrontendInstance,
|
||||
{
|
||||
pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
|
||||
Self {
|
||||
grpc_server_builder: Some(builder),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
|
||||
Self {
|
||||
http_server_builder: Some(builder),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
|
||||
let builder = if let Some(builder) = self.grpc_server_builder.take() {
|
||||
builder
|
||||
} else {
|
||||
self.grpc_server_builder(&opts.grpc)?
|
||||
};
|
||||
|
||||
let user_provider = self.plugins.get::<UserProviderRef>();
|
||||
|
||||
let greptime_request_handler = GreptimeRequestHandler::new(
|
||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||
user_provider.clone(),
|
||||
builder.runtime().clone(),
|
||||
);
|
||||
|
||||
let grpc_server = builder
|
||||
.database_handler(greptime_request_handler.clone())
|
||||
.prometheus_handler(self.instance.clone(), user_provider.clone())
|
||||
.otlp_handler(self.instance.clone(), user_provider)
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
.build();
|
||||
Ok(grpc_server)
|
||||
}
|
||||
|
||||
fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
|
||||
let builder = if let Some(builder) = self.http_server_builder.take() {
|
||||
builder
|
||||
} else {
|
||||
self.http_server_builder(opts)
|
||||
};
|
||||
|
||||
let http_server = builder
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_plugins(self.plugins.clone())
|
||||
.with_greptime_config_options(toml)
|
||||
.build();
|
||||
Ok(http_server)
|
||||
}
|
||||
|
||||
pub fn build(mut self) -> Result<ServerHandlers> {
|
||||
let opts = self.opts.clone();
|
||||
let instance = self.instance.clone();
|
||||
|
||||
let toml = opts.to_toml()?;
|
||||
let opts: FrontendOptions = opts.into();
|
||||
|
||||
@@ -92,21 +177,8 @@ impl Services {
|
||||
|
||||
{
|
||||
// Always init GRPC server
|
||||
let opts = &opts.grpc;
|
||||
let grpc_addr = parse_addr(&opts.addr)?;
|
||||
|
||||
let greptime_request_handler = GreptimeRequestHandler::new(
|
||||
ServerGrpcQueryHandlerAdapter::arc(instance.clone()),
|
||||
user_provider.clone(),
|
||||
builder.runtime().clone(),
|
||||
);
|
||||
let grpc_server = builder
|
||||
.database_handler(greptime_request_handler.clone())
|
||||
.prometheus_handler(instance.clone(), user_provider.clone())
|
||||
.otlp_handler(instance.clone(), user_provider.clone())
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
.build();
|
||||
|
||||
let grpc_addr = parse_addr(&opts.grpc.addr)?;
|
||||
let grpc_server = self.build_grpc_server(&opts)?;
|
||||
result.push((Box::new(grpc_server), grpc_addr));
|
||||
}
|
||||
|
||||
@@ -114,42 +186,7 @@ impl Services {
|
||||
// Always init HTTP server
|
||||
let http_options = &opts.http;
|
||||
let http_addr = parse_addr(&http_options.addr)?;
|
||||
|
||||
let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
|
||||
let _ = http_server_builder
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone()));
|
||||
|
||||
if let Some(user_provider) = user_provider.clone() {
|
||||
let _ = http_server_builder.with_user_provider(user_provider);
|
||||
}
|
||||
|
||||
if opts.opentsdb.enable {
|
||||
let _ = http_server_builder.with_opentsdb_handler(instance.clone());
|
||||
}
|
||||
|
||||
if opts.influxdb.enable {
|
||||
let _ = http_server_builder.with_influxdb_handler(instance.clone());
|
||||
}
|
||||
|
||||
if opts.prom_store.enable {
|
||||
let _ = http_server_builder
|
||||
.with_prom_handler(instance.clone())
|
||||
.with_prometheus_handler(instance.clone());
|
||||
http_server_builder
|
||||
.set_prom_store_with_metric_engine(opts.prom_store.with_metric_engine);
|
||||
}
|
||||
|
||||
if opts.otlp.enable {
|
||||
let _ = http_server_builder.with_otlp_handler(instance.clone());
|
||||
}
|
||||
|
||||
let http_server = http_server_builder
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_script_handler(instance.clone())
|
||||
.with_plugins(self.plugins.clone())
|
||||
.with_greptime_config_options(toml)
|
||||
.build();
|
||||
let http_server = self.build_http_server(&opts, toml)?;
|
||||
result.push((Box::new(http_server), http_addr));
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ pub mod read;
|
||||
pub mod region;
|
||||
mod region_write_ctx;
|
||||
pub mod request;
|
||||
mod row_converter;
|
||||
pub mod row_converter;
|
||||
pub(crate) mod schedule;
|
||||
pub mod sst;
|
||||
pub mod wal;
|
||||
|
||||
@@ -16,8 +16,8 @@ api.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
async-trait = "0.1"
|
||||
auth.workspace = true
|
||||
axum = { version = "0.6", features = ["headers"] }
|
||||
axum-macros = "0.3.8"
|
||||
axum.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
catalog.workspace = true
|
||||
|
||||
@@ -37,12 +37,18 @@ use crate::grpc::prom_query_gateway::PrometheusGatewayService;
|
||||
use crate::prometheus_handler::PrometheusHandlerRef;
|
||||
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
|
||||
|
||||
/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
|
||||
/// This macro will automatically add some gRPC properties to the service.
|
||||
#[macro_export]
|
||||
macro_rules! add_service {
|
||||
($builder: ident, $service: expr) => {
|
||||
$builder.routes_builder.add_service(
|
||||
let max_recv_message_size = $builder.config().max_recv_message_size;
|
||||
let max_send_message_size = $builder.config().max_send_message_size;
|
||||
|
||||
$builder.routes_builder_mut().add_service(
|
||||
$service
|
||||
.max_decoding_message_size($builder.config.max_recv_message_size)
|
||||
.max_encoding_message_size($builder.config.max_send_message_size),
|
||||
.max_decoding_message_size(max_recv_message_size)
|
||||
.max_encoding_message_size(max_send_message_size),
|
||||
)
|
||||
};
|
||||
}
|
||||
@@ -62,6 +68,10 @@ impl GrpcServerBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn config(&self) -> &GrpcServerConfig {
|
||||
&self.config
|
||||
}
|
||||
|
||||
pub fn runtime(&self) -> &Arc<Runtime> {
|
||||
&self.runtime
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse};
|
||||
@@ -62,7 +63,6 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::metrics_handler::MetricsHandler;
|
||||
use crate::prometheus_handler::PrometheusHandlerRef;
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
|
||||
use crate::query_handler::{
|
||||
InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef,
|
||||
@@ -99,26 +99,15 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"]
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct HttpServer {
|
||||
// server handlers
|
||||
sql_handler: Option<ServerSqlQueryHandlerRef>,
|
||||
grpc_handler: Option<ServerGrpcQueryHandlerRef>,
|
||||
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
|
||||
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
|
||||
prom_handler: Option<PromStoreProtocolHandlerRef>,
|
||||
prometheus_handler: Option<PrometheusHandlerRef>,
|
||||
otlp_handler: Option<OpenTelemetryProtocolHandlerRef>,
|
||||
script_handler: Option<ScriptHandlerRef>,
|
||||
router: StdMutex<Router>,
|
||||
shutdown_tx: Mutex<Option<Sender<()>>>,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
metrics_handler: Option<MetricsHandler>,
|
||||
|
||||
// plugins
|
||||
plugins: Plugins,
|
||||
|
||||
// server configs
|
||||
options: HttpOptions,
|
||||
greptime_config_options: Option<String>,
|
||||
prom_store_with_metric_engine: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -406,105 +395,16 @@ pub struct GreptimeOptionsConfigState {
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct HttpServerBuilder {
|
||||
inner: HttpServer,
|
||||
options: HttpOptions,
|
||||
plugins: Plugins,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
api: OpenApi,
|
||||
router: Router,
|
||||
}
|
||||
|
||||
impl HttpServerBuilder {
|
||||
pub fn new(options: HttpOptions) -> Self {
|
||||
Self {
|
||||
inner: HttpServer {
|
||||
sql_handler: None,
|
||||
grpc_handler: None,
|
||||
options,
|
||||
opentsdb_handler: None,
|
||||
influxdb_handler: None,
|
||||
prom_handler: None,
|
||||
prometheus_handler: None,
|
||||
otlp_handler: None,
|
||||
user_provider: None,
|
||||
script_handler: None,
|
||||
metrics_handler: None,
|
||||
shutdown_tx: Mutex::new(None),
|
||||
greptime_config_options: None,
|
||||
plugins: Default::default(),
|
||||
prom_store_with_metric_engine: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_sql_handler(&mut self, handler: ServerSqlQueryHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.sql_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_grpc_handler(&mut self, handler: ServerGrpcQueryHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.grpc_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.opentsdb_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_script_handler(&mut self, handler: ScriptHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.script_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.influxdb_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_prom_handler(&mut self, handler: PromStoreProtocolHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.prom_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_prometheus_handler(&mut self, handler: PrometheusHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.prometheus_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_otlp_handler(&mut self, handler: OpenTelemetryProtocolHandlerRef) -> &mut Self {
|
||||
let _ = self.inner.otlp_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_user_provider(&mut self, user_provider: UserProviderRef) -> &mut Self {
|
||||
let _ = self.inner.user_provider.get_or_insert(user_provider);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_metrics_handler(&mut self, handler: MetricsHandler) -> &mut Self {
|
||||
let _ = self.inner.metrics_handler.get_or_insert(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_plugins(&mut self, plugins: Plugins) -> &mut Self {
|
||||
self.inner.plugins = plugins;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_greptime_config_options(&mut self, opts: String) -> &mut Self {
|
||||
self.inner.greptime_config_options = Some(opts);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn set_prom_store_with_metric_engine(&mut self, with_metric_engine: bool) -> &mut Self {
|
||||
self.inner.prom_store_with_metric_engine = with_metric_engine;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(&mut self) -> HttpServer {
|
||||
std::mem::take(self).inner
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn make_app(&self) -> Router {
|
||||
let mut api = OpenApi {
|
||||
let api = OpenApi {
|
||||
info: Info {
|
||||
title: "GreptimeDB HTTP API".to_string(),
|
||||
description: Some("HTTP APIs to interact with GreptimeDB".to_string()),
|
||||
@@ -517,72 +417,149 @@ impl HttpServer {
|
||||
}],
|
||||
..OpenApi::default()
|
||||
};
|
||||
|
||||
let mut router = Router::new();
|
||||
|
||||
if let Some(sql_handler) = self.sql_handler.clone() {
|
||||
let sql_router = self
|
||||
.route_sql(ApiState {
|
||||
sql_handler,
|
||||
script_handler: self.script_handler.clone(),
|
||||
})
|
||||
.finish_api(&mut api)
|
||||
.layer(Extension(api.clone()));
|
||||
router = router.nest(&format!("/{HTTP_API_VERSION}"), sql_router);
|
||||
Self {
|
||||
options,
|
||||
plugins: Plugins::default(),
|
||||
user_provider: None,
|
||||
api,
|
||||
router: Router::new(),
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
|
||||
router = router.nest(
|
||||
pub fn with_sql_handler(
|
||||
mut self,
|
||||
sql_handler: ServerSqlQueryHandlerRef,
|
||||
script_handler: Option<ScriptHandlerRef>,
|
||||
) -> Self {
|
||||
let sql_router = HttpServer::route_sql(ApiState {
|
||||
sql_handler,
|
||||
script_handler,
|
||||
})
|
||||
.finish_api(&mut self.api)
|
||||
.layer(Extension(self.api.clone()));
|
||||
|
||||
Self {
|
||||
router: self
|
||||
.router
|
||||
.nest(&format!("/{HTTP_API_VERSION}"), sql_router),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self {
|
||||
Self {
|
||||
router: self.router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/opentsdb"),
|
||||
self.route_opentsdb(opentsdb_handler),
|
||||
);
|
||||
HttpServer::route_opentsdb(handler),
|
||||
),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(influxdb_handler) = self.influxdb_handler.clone() {
|
||||
router = router.nest(
|
||||
pub fn with_influxdb_handler(self, handler: InfluxdbLineProtocolHandlerRef) -> Self {
|
||||
Self {
|
||||
router: self.router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/influxdb"),
|
||||
self.route_influxdb(influxdb_handler),
|
||||
);
|
||||
HttpServer::route_influxdb(handler),
|
||||
),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prom_handler) = self.prom_handler.clone() {
|
||||
router = router.nest(
|
||||
pub fn with_prom_handler(
|
||||
self,
|
||||
handler: PromStoreProtocolHandlerRef,
|
||||
prom_store_with_metric_engine: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
router: self.router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/prometheus"),
|
||||
self.route_prom(prom_handler),
|
||||
);
|
||||
HttpServer::route_prom(handler, prom_store_with_metric_engine),
|
||||
),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prometheus_handler) = self.prometheus_handler.clone() {
|
||||
router = router.nest(
|
||||
pub fn with_prometheus_handler(self, handler: PrometheusHandlerRef) -> Self {
|
||||
Self {
|
||||
router: self.router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/prometheus/api/v1"),
|
||||
self.route_prometheus(prometheus_handler),
|
||||
);
|
||||
HttpServer::route_prometheus(handler),
|
||||
),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(otlp_handler) = self.otlp_handler.clone() {
|
||||
router = router.nest(
|
||||
pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self {
|
||||
Self {
|
||||
router: self.router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/otlp"),
|
||||
self.route_otlp(otlp_handler),
|
||||
);
|
||||
HttpServer::route_otlp(handler),
|
||||
),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(metrics_handler) = self.metrics_handler {
|
||||
router = router.nest("", self.route_metrics(metrics_handler));
|
||||
pub fn with_user_provider(self, user_provider: UserProviderRef) -> Self {
|
||||
Self {
|
||||
user_provider: Some(user_provider),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_metrics_handler(self, handler: MetricsHandler) -> Self {
|
||||
Self {
|
||||
router: self.router.nest("", HttpServer::route_metrics(handler)),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_plugins(self, plugins: Plugins) -> Self {
|
||||
Self { plugins, ..self }
|
||||
}
|
||||
|
||||
pub fn with_greptime_config_options(mut self, opts: String) -> Self {
|
||||
let config_router = HttpServer::route_config(GreptimeOptionsConfigState {
|
||||
greptime_config_options: opts,
|
||||
})
|
||||
.finish_api(&mut self.api);
|
||||
|
||||
Self {
|
||||
router: self.router.nest("", config_router),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_extra_router(self, router: Router) -> Self {
|
||||
Self {
|
||||
router: self.router.nest("", router),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build(self) -> HttpServer {
|
||||
HttpServer {
|
||||
options: self.options,
|
||||
user_provider: self.user_provider,
|
||||
shutdown_tx: Mutex::new(None),
|
||||
plugins: self.plugins,
|
||||
router: StdMutex::new(self.router),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn make_app(&self) -> Router {
|
||||
let mut router = {
|
||||
let router = self.router.lock().unwrap();
|
||||
router.clone()
|
||||
};
|
||||
|
||||
router = router.route(
|
||||
"/health",
|
||||
routing::get(handler::health).post(handler::health),
|
||||
);
|
||||
|
||||
let config_router = self
|
||||
.route_config(GreptimeOptionsConfigState {
|
||||
greptime_config_options: self.greptime_config_options.clone().unwrap_or_default(),
|
||||
})
|
||||
.finish_api(&mut api);
|
||||
|
||||
router = router.nest("", config_router);
|
||||
|
||||
router = router.route("/status", routing::get(handler::status));
|
||||
|
||||
#[cfg(feature = "dashboard")]
|
||||
@@ -643,13 +620,13 @@ impl HttpServer {
|
||||
)
|
||||
}
|
||||
|
||||
fn route_metrics<S>(&self, metrics_handler: MetricsHandler) -> Router<S> {
|
||||
fn route_metrics<S>(metrics_handler: MetricsHandler) -> Router<S> {
|
||||
Router::new()
|
||||
.route("/metrics", routing::get(handler::metrics))
|
||||
.with_state(metrics_handler)
|
||||
}
|
||||
|
||||
fn route_sql<S>(&self, api_state: ApiState) -> ApiRouter<S> {
|
||||
fn route_sql<S>(api_state: ApiState) -> ApiRouter<S> {
|
||||
ApiRouter::new()
|
||||
.api_route(
|
||||
"/sql",
|
||||
@@ -668,7 +645,7 @@ impl HttpServer {
|
||||
.with_state(api_state)
|
||||
}
|
||||
|
||||
fn route_prometheus<S>(&self, prometheus_handler: PrometheusHandlerRef) -> Router<S> {
|
||||
fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
|
||||
Router::new()
|
||||
.route(
|
||||
"/format_query",
|
||||
@@ -685,9 +662,12 @@ impl HttpServer {
|
||||
.with_state(prometheus_handler)
|
||||
}
|
||||
|
||||
fn route_prom<S>(&self, prom_handler: PromStoreProtocolHandlerRef) -> Router<S> {
|
||||
fn route_prom<S>(
|
||||
prom_handler: PromStoreProtocolHandlerRef,
|
||||
prom_store_with_metric_engine: bool,
|
||||
) -> Router<S> {
|
||||
let mut router = Router::new().route("/read", routing::post(prom_store::remote_read));
|
||||
if self.prom_store_with_metric_engine {
|
||||
if prom_store_with_metric_engine {
|
||||
router = router.route("/write", routing::post(prom_store::remote_write));
|
||||
} else {
|
||||
router = router.route(
|
||||
@@ -698,7 +678,7 @@ impl HttpServer {
|
||||
router.with_state(prom_handler)
|
||||
}
|
||||
|
||||
fn route_influxdb<S>(&self, influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
|
||||
fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
|
||||
Router::new()
|
||||
.route("/write", routing::post(influxdb_write_v1))
|
||||
.route("/api/v2/write", routing::post(influxdb_write_v2))
|
||||
@@ -707,20 +687,20 @@ impl HttpServer {
|
||||
.with_state(influxdb_handler)
|
||||
}
|
||||
|
||||
fn route_opentsdb<S>(&self, opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router<S> {
|
||||
fn route_opentsdb<S>(opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router<S> {
|
||||
Router::new()
|
||||
.route("/api/put", routing::post(opentsdb::put))
|
||||
.with_state(opentsdb_handler)
|
||||
}
|
||||
|
||||
fn route_otlp<S>(&self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
|
||||
fn route_otlp<S>(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
|
||||
Router::new()
|
||||
.route("/v1/metrics", routing::post(otlp::metrics))
|
||||
.route("/v1/traces", routing::post(otlp::traces))
|
||||
.with_state(otlp_handler)
|
||||
}
|
||||
|
||||
fn route_config<S>(&self, state: GreptimeOptionsConfigState) -> ApiRouter<S> {
|
||||
fn route_config<S>(state: GreptimeOptionsConfigState) -> ApiRouter<S> {
|
||||
ApiRouter::new()
|
||||
.route("/config", apirouting::get(handler::config))
|
||||
.with_state(state)
|
||||
@@ -841,7 +821,7 @@ mod test {
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdapter};
|
||||
use crate::query_handler::grpc::GrpcQueryHandler;
|
||||
use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
|
||||
|
||||
struct DummyInstance {
|
||||
@@ -909,10 +889,8 @@ mod test {
|
||||
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
|
||||
let instance = Arc::new(DummyInstance { _tx: tx });
|
||||
let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
|
||||
let grpc_instance = ServerGrpcQueryHandlerAdapter::arc(instance);
|
||||
let server = HttpServerBuilder::new(HttpOptions::default())
|
||||
.with_sql_handler(sql_instance)
|
||||
.with_grpc_handler(grpc_instance)
|
||||
.with_sql_handler(sql_instance, None)
|
||||
.build();
|
||||
server.build(server.make_app()).route(
|
||||
"/test/timeout",
|
||||
|
||||
@@ -18,7 +18,7 @@ use common_test_util::ports;
|
||||
use servers::http::{HttpOptions, HttpServerBuilder};
|
||||
use table::test_util::MemTable;
|
||||
|
||||
use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler};
|
||||
use crate::create_testing_sql_query_handler;
|
||||
|
||||
fn make_test_app() -> Router {
|
||||
let http_opts = HttpOptions {
|
||||
@@ -27,12 +27,10 @@ fn make_test_app() -> Router {
|
||||
};
|
||||
|
||||
let server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(create_testing_sql_query_handler(
|
||||
MemTable::default_numbers_table(),
|
||||
))
|
||||
.with_grpc_handler(create_testing_grpc_query_handler(
|
||||
MemTable::default_numbers_table(),
|
||||
))
|
||||
.with_sql_handler(
|
||||
create_testing_sql_query_handler(MemTable::default_numbers_table()),
|
||||
None,
|
||||
)
|
||||
.build();
|
||||
server.build(server.make_app())
|
||||
}
|
||||
|
||||
@@ -120,8 +120,7 @@ fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>)
|
||||
})
|
||||
}
|
||||
let server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(instance.clone())
|
||||
.with_grpc_handler(instance.clone())
|
||||
.with_sql_handler(instance.clone(), None)
|
||||
.with_user_provider(Arc::new(user_provider))
|
||||
.with_influxdb_handler(instance)
|
||||
.build();
|
||||
|
||||
@@ -109,8 +109,7 @@ fn make_test_app(tx: mpsc::Sender<String>) -> Router {
|
||||
|
||||
let instance = Arc::new(DummyInstance { tx });
|
||||
let server = HttpServerBuilder::new(http_opts)
|
||||
.with_grpc_handler(instance.clone())
|
||||
.with_sql_handler(instance.clone())
|
||||
.with_sql_handler(instance.clone(), None)
|
||||
.with_opentsdb_handler(instance)
|
||||
.build();
|
||||
server.build(server.make_app())
|
||||
|
||||
@@ -133,9 +133,8 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
|
||||
|
||||
let instance = Arc::new(DummyInstance { tx });
|
||||
let server = HttpServerBuilder::new(http_opts)
|
||||
.with_grpc_handler(instance.clone())
|
||||
.with_sql_handler(instance.clone())
|
||||
.with_prom_handler(instance)
|
||||
.with_sql_handler(instance.clone(), None)
|
||||
.with_prom_handler(instance, true)
|
||||
.build();
|
||||
server.build(server.make_app())
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@ api.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
async-trait = "0.1"
|
||||
auth.workspace = true
|
||||
axum = "0.6"
|
||||
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
|
||||
axum.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
client = { workspace = true, features = ["testing"] }
|
||||
|
||||
@@ -387,10 +387,10 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
|
||||
..Default::default()
|
||||
};
|
||||
let http_server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(
|
||||
instance.instance.clone(),
|
||||
))
|
||||
.with_sql_handler(
|
||||
ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()),
|
||||
None,
|
||||
)
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
|
||||
.build();
|
||||
@@ -420,16 +420,15 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
|
||||
|
||||
let mut http_server = HttpServerBuilder::new(http_opts);
|
||||
|
||||
http_server
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(
|
||||
instance.instance.clone(),
|
||||
))
|
||||
.with_script_handler(instance.instance.clone())
|
||||
http_server = http_server
|
||||
.with_sql_handler(
|
||||
ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()),
|
||||
Some(instance.instance.clone()),
|
||||
)
|
||||
.with_greptime_config_options(instance.mix_options.to_toml().unwrap());
|
||||
|
||||
if let Some(user_provider) = user_provider {
|
||||
http_server.with_user_provider(user_provider);
|
||||
http_server = http_server.with_user_provider(user_provider);
|
||||
}
|
||||
|
||||
let http_server = http_server.build();
|
||||
@@ -458,10 +457,11 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
};
|
||||
let frontend_ref = instance.instance.clone();
|
||||
let http_server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(frontend_ref.clone()))
|
||||
.with_script_handler(frontend_ref.clone())
|
||||
.with_prom_handler(frontend_ref.clone())
|
||||
.with_sql_handler(
|
||||
ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()),
|
||||
Some(frontend_ref.clone()),
|
||||
)
|
||||
.with_prom_handler(frontend_ref.clone(), true)
|
||||
.with_prometheus_handler(frontend_ref)
|
||||
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
|
||||
.build();
|
||||
|
||||
Reference in New Issue
Block a user