1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::postgres::PostgresServer;
41use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
42use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
43use servers::request_memory_limiter::ServerMemoryLimiter;
44use servers::server::{Server, ServerHandlers};
45use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
46use snafu::ResultExt;
47use tonic::Status;
48
49use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
50use crate::frontend::FrontendOptions;
51use crate::instance::Instance;
52
53pub struct Services<T>
54where
55 T: Into<FrontendOptions> + Configurable + Clone,
56{
57 opts: T,
58 instance: Arc<Instance>,
59 grpc_server_builder: Option<GrpcServerBuilder>,
60 http_server_builder: Option<HttpServerBuilder>,
61 plugins: Plugins,
62 flight_handler: Option<FlightCraftRef>,
63}
64
65impl<T> Services<T>
66where
67 T: Into<FrontendOptions> + Configurable + Clone,
68{
69 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
70 Self {
71 opts,
72 instance,
73 grpc_server_builder: None,
74 http_server_builder: None,
75 plugins,
76 flight_handler: None,
77 }
78 }
79
80 pub fn grpc_server_builder(
81 &self,
82 opts: &GrpcOptions,
83 request_memory_limiter: ServerMemoryLimiter,
84 ) -> Result<GrpcServerBuilder> {
85 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
86 .with_memory_limiter(request_memory_limiter)
87 .with_tls_config(opts.tls.clone())
88 .context(error::InvalidTlsConfigSnafu)?;
89 Ok(builder)
90 }
91
92 pub fn http_server_builder(
93 &self,
94 opts: &FrontendOptions,
95 request_memory_limiter: ServerMemoryLimiter,
96 ) -> HttpServerBuilder {
97 let mut builder = HttpServerBuilder::new(opts.http.clone())
98 .with_memory_limiter(request_memory_limiter)
99 .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
100
101 let validator = self.plugins.get::<LogValidatorRef>();
102 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
103 builder =
104 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
105 builder = builder.with_logs_handler(self.instance.clone());
106
107 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
108 builder = builder.with_user_provider(user_provider);
109 }
110
111 if opts.opentsdb.enable {
112 builder = builder.with_opentsdb_handler(self.instance.clone());
113 }
114
115 if opts.influxdb.enable {
116 builder = builder.with_influxdb_handler(self.instance.clone());
117 }
118
119 if opts.prom_store.enable {
120 builder = builder
121 .with_prom_handler(
122 self.instance.clone(),
123 Some(self.instance.clone()),
124 opts.prom_store.with_metric_engine,
125 opts.http.prom_validation_mode,
126 )
127 .with_prometheus_handler(self.instance.clone());
128 }
129
130 if opts.otlp.enable {
131 builder = builder
132 .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
133 }
134
135 if opts.jaeger.enable {
136 builder = builder.with_jaeger_handler(self.instance.clone());
137 }
138
139 if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
140 info!("Adding extra router from plugins");
141 builder = builder.with_extra_router(configurator.router());
142 }
143
144 builder.add_layer(axum::middleware::from_fn_with_state(
145 self.instance.clone(),
146 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
147 if state.is_suspended() {
148 return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
149 .into_response();
150 }
151 next.run(request).await
152 },
153 ))
154 }
155
156 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
157 Self {
158 grpc_server_builder: Some(builder),
159 ..self
160 }
161 }
162
163 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
164 Self {
165 http_server_builder: Some(builder),
166 ..self
167 }
168 }
169
170 pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
171 Self {
172 flight_handler: Some(flight_handler),
173 ..self
174 }
175 }
176
177 fn build_grpc_server(
178 &mut self,
179 grpc: &GrpcOptions,
180 meta_client: &Option<MetaClientOptions>,
181 name: Option<String>,
182 external: bool,
183 request_memory_limiter: ServerMemoryLimiter,
184 ) -> Result<GrpcServer> {
185 let builder = if let Some(builder) = self.grpc_server_builder.take() {
186 builder
187 } else {
188 self.grpc_server_builder(grpc, request_memory_limiter)?
189 };
190
191 let user_provider = if external {
192 self.plugins.get::<UserProviderRef>()
193 } else {
194 None
196 };
197
198 let runtime = if meta_client.is_none() {
200 Some(builder.runtime().clone())
201 } else {
202 None
203 };
204
205 let greptime_request_handler = GreptimeRequestHandler::new(
206 ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
207 user_provider.clone(),
208 runtime,
209 grpc.flight_compression,
210 );
211
212 let flight_handler = self
214 .flight_handler
215 .clone()
216 .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
217
218 let grpc_server = builder
219 .name(name)
220 .database_handler(greptime_request_handler.clone())
221 .prometheus_handler(self.instance.clone(), user_provider.clone())
222 .otel_arrow_handler(OtelArrowServiceHandler::new(
223 self.instance.clone(),
224 user_provider.clone(),
225 ))
226 .flight_handler(flight_handler)
227 .add_layer(axum::middleware::from_fn_with_state(
228 self.instance.clone(),
229 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
230 if state.is_suspended() {
231 let status = Status::from(servers::error::SuspendedSnafu.build());
232 return status.into_http();
233 }
234 next.run(request).await
235 },
236 ));
237
238 let grpc_server = if !external {
239 let frontend_grpc_handler =
240 FrontendGrpcHandler::new(self.instance.process_manager().clone());
241 grpc_server.frontend_grpc_handler(frontend_grpc_handler)
242 } else {
243 grpc_server
244 }
245 .build();
246
247 Ok(grpc_server)
248 }
249
250 fn build_http_server(
251 &mut self,
252 opts: &FrontendOptions,
253 toml: String,
254 request_memory_limiter: ServerMemoryLimiter,
255 ) -> Result<HttpServer> {
256 let builder = if let Some(builder) = self.http_server_builder.take() {
257 builder
258 } else {
259 self.http_server_builder(opts, request_memory_limiter)
260 };
261
262 let http_server = builder
263 .with_metrics_handler(MetricsHandler)
264 .with_plugins(self.plugins.clone())
265 .with_greptime_config_options(toml)
266 .build();
267 Ok(http_server)
268 }
269
270 pub fn build(mut self) -> Result<ServerHandlers> {
271 let opts = self.opts.clone();
272 let instance = self.instance.clone();
273
274 let toml = opts.to_toml().context(TomlFormatSnafu)?;
275 let opts: FrontendOptions = opts.into();
276
277 let request_memory_limiter = ServerMemoryLimiter::new(
279 opts.max_in_flight_write_bytes.as_bytes(),
280 opts.write_bytes_exhausted_policy,
281 );
282
283 let handlers = ServerHandlers::default();
284
285 let user_provider = self.plugins.get::<UserProviderRef>();
286
287 {
288 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
290 let grpc_server = self.build_grpc_server(
291 &opts.grpc,
292 &opts.meta_client,
293 None,
294 true,
295 request_memory_limiter.clone(),
296 )?;
297 handlers.insert((Box::new(grpc_server), grpc_addr));
298 }
299
300 if let Some(internal_grpc) = &opts.internal_grpc {
301 let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
303 let grpc_server = self.build_grpc_server(
304 internal_grpc,
305 &opts.meta_client,
306 Some("INTERNAL_GRPC_SERVER".to_string()),
307 false,
308 request_memory_limiter.clone(),
309 )?;
310 handlers.insert((Box::new(grpc_server), grpc_addr));
311 }
312
313 {
314 let http_options = &opts.http;
316 let http_addr = parse_addr(&http_options.addr)?;
317 let http_server =
318 self.build_http_server(&opts, toml, request_memory_limiter.clone())?;
319 handlers.insert((Box::new(http_server), http_addr));
320 }
321
322 if opts.mysql.enable {
323 let opts = &opts.mysql;
325 let mysql_addr = parse_addr(&opts.addr)?;
326
327 let tls_server_config = Arc::new(
328 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
329 );
330
331 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
333
334 let mysql_server = MysqlServer::create_server(
335 common_runtime::global_runtime(),
336 Arc::new(MysqlSpawnRef::new(
337 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
338 user_provider.clone(),
339 )),
340 Arc::new(MysqlSpawnConfig::new(
341 opts.tls.should_force_tls(),
342 tls_server_config,
343 opts.keep_alive.as_secs(),
344 opts.reject_no_database.unwrap_or(false),
345 opts.prepared_stmt_cache_size,
346 )),
347 Some(instance.process_manager().clone()),
348 );
349 handlers.insert((mysql_server, mysql_addr));
350 }
351
352 if opts.postgres.enable {
353 let opts = &opts.postgres;
355 let pg_addr = parse_addr(&opts.addr)?;
356
357 let tls_server_config = Arc::new(
358 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
359 );
360
361 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
362
363 let pg_server = Box::new(PostgresServer::new(
364 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
365 opts.tls.should_force_tls(),
366 tls_server_config,
367 opts.keep_alive.as_secs(),
368 common_runtime::global_runtime(),
369 user_provider.clone(),
370 Some(self.instance.process_manager().clone()),
371 )) as Box<dyn Server>;
372
373 handlers.insert((pg_server, pg_addr));
374 }
375
376 Ok(handlers)
377 }
378}
379
380fn parse_addr(addr: &str) -> Result<SocketAddr> {
381 addr.parse().context(error::ParseAddrSnafu { addr })
382}