1use std::env;
16use std::fmt::Display;
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use auth::{DefaultPermissionChecker, PermissionCheckerRef, UserProviderRef};
21use axum::Router;
22use catalog::kvbackend::KvBackendCatalogManager;
23use common_base::Plugins;
24use common_config::Configurable;
25use common_meta::key::catalog_name::CatalogNameKey;
26use common_meta::key::schema_name::SchemaNameKey;
27use common_query::Output;
28use common_runtime::runtime::BuilderBuild;
29use common_runtime::{Builder as RuntimeBuilder, Runtime};
30use common_test_util::ports;
31use common_test_util::temp_dir::{TempDir, create_temp_dir};
32use common_wal::config::DatanodeWalConfig;
33use datanode::config::{DatanodeOptions, StorageConfig};
34use frontend::instance::Instance;
35use frontend::service_config::{MysqlOptions, PostgresOptions};
36use mito2::gc::GcConfig;
37use object_store::config::{
38 AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
39};
40use object_store::services::{Azblob, Gcs, Oss, S3};
41use object_store::test_util::TempFolder;
42use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
43use servers::grpc::builder::GrpcServerBuilder;
44use servers::grpc::greptime_handler::GreptimeRequestHandler;
45use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
46use servers::http::{HttpOptions, HttpServerBuilder};
47use servers::metrics_handler::MetricsHandler;
48use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
49use servers::otel_arrow::OtelArrowServiceHandler;
50use servers::postgres::PostgresServer;
51use servers::prom_remote_write::validation::PromValidationMode;
52use servers::query_handler::sql::SqlQueryHandler;
53use servers::request_memory_limiter::ServerMemoryLimiter;
54use servers::server::Server;
55use servers::tls::ReloadableTlsServerConfig;
56use session::context::QueryContext;
57
58use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
59
60pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";
61
62#[derive(Debug, Clone, Copy, Eq, PartialEq)]
63pub enum StorageType {
64 S3,
65 S3WithCache,
66 File,
67 Oss,
68 Azblob,
69 Gcs,
70}
71
72impl Display for StorageType {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 StorageType::S3 => write!(f, "S3"),
76 StorageType::S3WithCache => write!(f, "S3"),
77 StorageType::File => write!(f, "File"),
78 StorageType::Oss => write!(f, "Oss"),
79 StorageType::Azblob => write!(f, "Azblob"),
80 StorageType::Gcs => write!(f, "Gcs"),
81 }
82 }
83}
84
85impl StorageType {
86 pub fn build_storage_types_based_on_env() -> Vec<StorageType> {
87 let mut storage_types = Vec::with_capacity(4);
88 storage_types.push(StorageType::File);
89 if let Ok(bucket) = env::var("GT_S3_BUCKET")
90 && !bucket.is_empty()
91 {
92 storage_types.push(StorageType::S3);
93 }
94 if env::var("GT_OSS_BUCKET").is_ok() {
95 storage_types.push(StorageType::Oss);
96 }
97 if env::var("GT_AZBLOB_CONTAINER").is_ok() {
98 storage_types.push(StorageType::Azblob);
99 }
100 if env::var("GT_GCS_BUCKET").is_ok() {
101 storage_types.push(StorageType::Gcs);
102 }
103 storage_types
104 }
105
106 pub fn test_on(&self) -> bool {
107 let _ = dotenv::dotenv();
108
109 match self {
110 StorageType::File => true, StorageType::S3 | StorageType::S3WithCache => {
112 if let Ok(b) = env::var("GT_S3_BUCKET") {
113 !b.is_empty()
114 } else {
115 false
116 }
117 }
118 StorageType::Oss => {
119 if let Ok(b) = env::var("GT_OSS_BUCKET") {
120 !b.is_empty()
121 } else {
122 false
123 }
124 }
125 StorageType::Azblob => {
126 if let Ok(b) = env::var("GT_AZBLOB_CONTAINER") {
127 !b.is_empty()
128 } else {
129 false
130 }
131 }
132 StorageType::Gcs => {
133 if let Ok(b) = env::var("GT_GCS_BUCKET") {
134 !b.is_empty()
135 } else {
136 false
137 }
138 }
139 }
140 }
141}
142
143fn s3_test_config() -> S3Config {
144 S3Config {
145 connection: S3Connection {
146 root: uuid::Uuid::new_v4().to_string(),
147 access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap().into(),
148 secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(),
149 bucket: env::var("GT_S3_BUCKET").unwrap(),
150 region: Some(env::var("GT_S3_REGION").unwrap()),
151 endpoint: env::var("GT_S3_ENDPOINT_URL").ok(),
152 ..Default::default()
153 },
154 ..Default::default()
155 }
156}
157
158pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, TempDirGuard) {
159 let _ = dotenv::dotenv();
160
161 match store_type {
162 StorageType::Gcs => {
163 let gcs_config = GcsConfig {
164 connection: GcsConnection {
165 root: uuid::Uuid::new_v4().to_string(),
166 bucket: env::var("GT_GCS_BUCKET").unwrap(),
167 scope: env::var("GT_GCS_SCOPE").unwrap(),
168 credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(),
169 credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(),
170 endpoint: env::var("GT_GCS_ENDPOINT").unwrap_or_default(),
171 },
172 ..Default::default()
173 };
174
175 let builder = Gcs::from(&gcs_config.connection);
176 let config = ObjectStoreConfig::Gcs(gcs_config);
177 let store = ObjectStore::new(builder).unwrap().finish();
178 (config, TempDirGuard::Gcs(TempFolder::new(&store, "/")))
179 }
180 StorageType::Azblob => {
181 let azblob_config = AzblobConfig {
182 connection: AzblobConnection {
183 root: uuid::Uuid::new_v4().to_string(),
184 container: env::var("GT_AZBLOB_CONTAINER").unwrap(),
185 account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(),
186 account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(),
187 endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(),
188 ..Default::default()
189 },
190 ..Default::default()
191 };
192
193 let builder = Azblob::from(&azblob_config.connection);
194 let config = ObjectStoreConfig::Azblob(azblob_config);
195 let store = ObjectStore::new(builder).unwrap().finish();
196 (config, TempDirGuard::Azblob(TempFolder::new(&store, "/")))
197 }
198 StorageType::Oss => {
199 let oss_config = OssConfig {
200 connection: OssConnection {
201 root: uuid::Uuid::new_v4().to_string(),
202 access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap().into(),
203 access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap().into(),
204 bucket: env::var("GT_OSS_BUCKET").unwrap(),
205 endpoint: env::var("GT_OSS_ENDPOINT").unwrap(),
206 },
207 ..Default::default()
208 };
209
210 let builder = Oss::from(&oss_config.connection);
211 let config = ObjectStoreConfig::Oss(oss_config);
212 let store = ObjectStore::new(builder).unwrap().finish();
213 (config, TempDirGuard::Oss(TempFolder::new(&store, "/")))
214 }
215 StorageType::S3 | StorageType::S3WithCache => {
216 let mut s3_config = s3_test_config();
217
218 if *store_type == StorageType::S3WithCache {
219 s3_config.cache.cache_path = "/tmp/greptimedb_cache".to_string();
220 } else {
221 s3_config.cache.enable_read_cache = false;
222 }
223
224 let builder = S3::from(&s3_config.connection);
225 let config = ObjectStoreConfig::S3(s3_config);
226 let store = ObjectStore::new(builder).unwrap().finish();
227 (config, TempDirGuard::S3(TempFolder::new(&store, "/")))
228 }
229 StorageType::File => (ObjectStoreConfig::File(FileConfig {}), TempDirGuard::None),
230 }
231}
232
233pub enum TempDirGuard {
234 None,
235 S3(TempFolder),
236 Oss(TempFolder),
237 Azblob(TempFolder),
238 Gcs(TempFolder),
239}
240
241pub struct TestGuard {
242 pub home_guard: FileDirGuard,
243 pub storage_guards: Vec<StorageGuard>,
244}
245
246pub struct FileDirGuard {
247 pub temp_dir: TempDir,
248}
249
250impl FileDirGuard {
251 pub fn new(temp_dir: TempDir) -> Self {
252 Self { temp_dir }
253 }
254}
255
256pub struct StorageGuard(pub TempDirGuard);
257
258impl TestGuard {
259 pub async fn remove_all(&mut self) {
260 for storage_guard in self.storage_guards.iter_mut() {
261 if let TempDirGuard::S3(guard)
262 | TempDirGuard::Oss(guard)
263 | TempDirGuard::Azblob(guard)
264 | TempDirGuard::Gcs(guard) = &mut storage_guard.0
265 {
266 guard.remove_all().await.unwrap()
267 }
268 }
269 }
270}
271
272impl Drop for TestGuard {
273 fn drop(&mut self) {
274 let (tx, rx) = std::sync::mpsc::channel();
275
276 let guards = std::mem::take(&mut self.storage_guards);
277 common_runtime::spawn_global(async move {
278 let mut errors = vec![];
279 for guard in guards {
280 if let TempDirGuard::S3(guard)
281 | TempDirGuard::Oss(guard)
282 | TempDirGuard::Azblob(guard)
283 | TempDirGuard::Gcs(guard) = guard.0
284 && let Err(e) = guard.remove_all().await
285 {
286 errors.push(e);
287 }
288 }
289 if errors.is_empty() {
290 tx.send(Ok(())).unwrap();
291 } else {
292 tx.send(Err(errors)).unwrap();
293 }
294 });
295 rx.recv().unwrap().unwrap_or_else(|e| panic!("{:?}", e));
296 }
297}
298
299pub fn create_tmp_dir_and_datanode_opts(
300 default_store_type: StorageType,
301 store_provider_types: Vec<StorageType>,
302 name: &str,
303 wal_config: DatanodeWalConfig,
304 gc_config: GcConfig,
305) -> (DatanodeOptions, TestGuard) {
306 let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
307 let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
308
309 let mut store_providers = Vec::with_capacity(store_provider_types.len());
311 let mut storage_guards = Vec::with_capacity(store_provider_types.len() + 1);
313
314 let (default_store, data_tmp_dir) = get_test_store_config(&default_store_type);
315 storage_guards.push(StorageGuard(data_tmp_dir));
316
317 for store_type in store_provider_types {
318 let (store, data_tmp_dir) = get_test_store_config(&store_type);
319 store_providers.push(store);
320 storage_guards.push(StorageGuard(data_tmp_dir))
321 }
322 let opts = create_datanode_opts(
323 default_store,
324 store_providers,
325 home_dir,
326 wal_config,
327 gc_config,
328 );
329
330 (
331 opts,
332 TestGuard {
333 home_guard: FileDirGuard::new(home_tmp_dir),
334 storage_guards,
335 },
336 )
337}
338
339pub(crate) fn create_datanode_opts(
340 default_store: ObjectStoreConfig,
341 providers: Vec<ObjectStoreConfig>,
342 home_dir: String,
343 wal_config: DatanodeWalConfig,
344 gc_config: GcConfig,
345) -> DatanodeOptions {
346 let region_engine = DatanodeOptions::default()
347 .region_engine
348 .into_iter()
349 .map(|mut v| {
350 if let datanode::config::RegionEngineConfig::Mito(mito_config) = &mut v {
351 mito_config.gc = gc_config.clone();
352 }
353 v
354 })
355 .collect();
356 DatanodeOptions {
357 node_id: Some(0),
358 require_lease_before_startup: true,
359 storage: StorageConfig {
360 data_home: home_dir,
361 providers,
362 store: default_store,
363 },
364 grpc: GrpcOptions::default()
365 .with_bind_addr(PEER_PLACEHOLDER_ADDR)
366 .with_server_addr(PEER_PLACEHOLDER_ADDR),
367 wal: wal_config,
368 region_engine,
369 ..Default::default()
370 }
371}
372
373pub(crate) async fn create_test_table(instance: &Instance, table_name: &str) {
374 let sql = format!(
375 r#"
376CREATE TABLE IF NOT EXISTS {table_name} (
377 host String NOT NULL PRIMARY KEY,
378 cpu DOUBLE NULL,
379 memory DOUBLE NULL,
380 ts TIMESTAMP NOT NULL TIME INDEX,
381)
382"#
383 );
384
385 let result = instance.do_query(&sql, QueryContext::arc()).await;
386 let _ = result.first().unwrap().as_ref().unwrap();
387}
388
389async fn setup_standalone_instance(
390 test_name: &str,
391 store_type: StorageType,
392) -> GreptimeDbStandalone {
393 GreptimeDbStandaloneBuilder::new(test_name)
394 .with_default_store_type(store_type)
395 .build()
396 .await
397}
398
399async fn setup_standalone_instance_with_slow_query_threshold(
400 test_name: &str,
401 store_type: StorageType,
402 slow_query_threshold: std::time::Duration,
403) -> GreptimeDbStandalone {
404 GreptimeDbStandaloneBuilder::new(test_name)
405 .with_default_store_type(store_type)
406 .with_slow_query_threshold(slow_query_threshold)
407 .build()
408 .await
409}
410
411async fn setup_standalone_instance_with_plugins(
412 test_name: &str,
413 store_type: StorageType,
414 plugins: Plugins,
415) -> GreptimeDbStandalone {
416 GreptimeDbStandaloneBuilder::new(test_name)
417 .with_default_store_type(store_type)
418 .with_plugin(plugins)
419 .build()
420 .await
421}
422
423async fn setup_standalone_instance_with_plugins_and_slow_query_threshold(
424 test_name: &str,
425 store_type: StorageType,
426 plugins: Plugins,
427 slow_query_threshold: std::time::Duration,
428) -> GreptimeDbStandalone {
429 GreptimeDbStandaloneBuilder::new(test_name)
430 .with_default_store_type(store_type)
431 .with_plugin(plugins)
432 .with_slow_query_threshold(slow_query_threshold)
433 .build()
434 .await
435}
436
437pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
438 let instance = setup_standalone_instance(name, store_type).await;
439
440 let http_opts = HttpOptions {
441 addr: format!("127.0.0.1:{}", ports::get_port()),
442 ..Default::default()
443 };
444 let http_server = HttpServerBuilder::new(http_opts)
445 .with_sql_handler(instance.fe_instance().clone())
446 .with_logs_handler(instance.fe_instance().clone())
447 .with_metrics_handler(MetricsHandler)
448 .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
449 .build();
450 (
451 http_server.build(http_server.make_app()).unwrap(),
452 instance.guard,
453 )
454}
455
456pub async fn setup_test_http_app_with_frontend(
457 store_type: StorageType,
458 name: &str,
459) -> (Router, TestGuard) {
460 setup_test_http_app_with_frontend_and_user_provider(store_type, name, None).await
461}
462
463pub async fn setup_test_http_app_with_frontend_and_slow_query_threshold(
464 store_type: StorageType,
465 name: &str,
466 slow_query_threshold: std::time::Duration,
467) -> (Router, TestGuard) {
468 let instance =
469 setup_standalone_instance_with_slow_query_threshold(name, store_type, slow_query_threshold)
470 .await;
471
472 create_test_table(instance.fe_instance(), "demo").await;
473
474 let http_opts = HttpOptions {
475 addr: format!("127.0.0.1:{}", ports::get_port()),
476 ..Default::default()
477 };
478
479 let http_server = HttpServerBuilder::new(http_opts)
480 .with_sql_handler(instance.fe_instance().clone())
481 .with_log_ingest_handler(instance.fe_instance().clone(), None, None)
482 .with_logs_handler(instance.fe_instance().clone())
483 .with_influxdb_handler(instance.fe_instance().clone())
484 .with_otlp_handler(instance.fe_instance().clone(), true)
485 .with_jaeger_handler(instance.fe_instance().clone())
486 .with_greptime_config_options(instance.opts.to_toml().unwrap())
487 .build();
488
489 let app = http_server.build(http_server.make_app()).unwrap();
490 (app, instance.guard)
491}
492
493pub async fn setup_test_http_app_with_frontend_and_user_provider(
494 store_type: StorageType,
495 name: &str,
496 user_provider: Option<UserProviderRef>,
497) -> (Router, TestGuard) {
498 setup_test_http_app_with_frontend_and_custom_options(
499 store_type,
500 name,
501 user_provider,
502 None,
503 None,
504 )
505 .await
506}
507
508pub async fn setup_test_http_app_with_frontend_and_custom_options(
509 store_type: StorageType,
510 name: &str,
511 user_provider: Option<UserProviderRef>,
512 http_opts: Option<HttpOptions>,
513 memory_limiter: Option<ServerMemoryLimiter>,
514) -> (Router, TestGuard) {
515 let plugins = Plugins::new();
516 if let Some(user_provider) = user_provider.clone() {
517 plugins.insert::<UserProviderRef>(user_provider.clone());
518 plugins.insert::<PermissionCheckerRef>(DefaultPermissionChecker::arc());
519 }
520
521 let instance = setup_standalone_instance_with_plugins(name, store_type, plugins).await;
522
523 create_test_table(instance.fe_instance(), "demo").await;
524
525 let http_opts = http_opts.unwrap_or_else(|| HttpOptions {
526 addr: format!("127.0.0.1:{}", ports::get_port()),
527 ..Default::default()
528 });
529
530 let mut http_server = HttpServerBuilder::new(http_opts)
531 .with_sql_handler(instance.fe_instance().clone())
532 .with_log_ingest_handler(instance.fe_instance().clone(), None, None)
533 .with_logs_handler(instance.fe_instance().clone())
534 .with_influxdb_handler(instance.fe_instance().clone())
535 .with_otlp_handler(instance.fe_instance().clone(), true)
536 .with_jaeger_handler(instance.fe_instance().clone())
537 .with_greptime_config_options(instance.opts.to_toml().unwrap());
538
539 if let Some(user_provider) = user_provider {
540 http_server = http_server.with_user_provider(user_provider);
541 }
542
543 if let Some(limiter) = memory_limiter {
544 http_server = http_server.with_memory_limiter(limiter);
545 }
546
547 let http_server = http_server.build();
548
549 let app = http_server.build(http_server.make_app()).unwrap();
550 (app, instance.guard)
551}
552
553async fn run_sql(sql: &str, instance: &GreptimeDbStandalone) {
554 let result = instance
555 .fe_instance()
556 .do_query(sql, QueryContext::arc())
557 .await;
558 let _ = result.first().unwrap().as_ref().unwrap();
559}
560
561pub async fn setup_test_prom_app_with_frontend(
562 store_type: StorageType,
563 name: &str,
564) -> (Router, TestGuard) {
565 unsafe {
566 std::env::set_var("TZ", "UTC");
567 }
568
569 let instance = setup_standalone_instance(name, store_type).await;
570
571 let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
573 run_sql(sql, &instance).await;
574 let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
575 run_sql(sql, &instance).await;
576 let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
578 run_sql(sql, &instance).await;
579 let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
580 run_sql(sql, &instance).await;
581 let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')";
582 run_sql(sql, &instance).await;
583
584 let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
586 run_sql(sql, &instance).await;
587 let sql =
588 "INSERT INTO demo_metrics(idc, val, ts) VALUES ('idc1', 1.1, 0), ('idc2', 2.1, 600000)";
589 run_sql(sql, &instance).await;
590 let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)";
592 run_sql(sql, &instance).await;
593
594 let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)";
596 run_sql(sql, &instance).await;
597
598 let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
600 run_sql(sql, &instance).await;
601 let sql = "CREATE TABLE demo_metrics_with_nanos(ts timestamp(9) time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy2')";
602 run_sql(sql, &instance).await;
603 let sql = "INSERT INTO demo_metrics_with_nanos(idc, val, ts) VALUES ('idc1', 1.1, 0)";
604 run_sql(sql, &instance).await;
605
606 let sql = "CREATE TABLE mito (ts timestamp(9) time index, val double, host bigint primary key) engine=mito";
608 run_sql(sql, &instance).await;
609 let sql = "INSERT INTO mito(host, val, ts) VALUES (1, 1.1, 0)";
610 run_sql(sql, &instance).await;
611
612 let http_opts = HttpOptions {
613 addr: format!("127.0.0.1:{}", ports::get_port()),
614 ..Default::default()
615 };
616 let frontend_ref = instance.fe_instance().clone();
617 let http_server = HttpServerBuilder::new(http_opts)
618 .with_sql_handler(frontend_ref.clone())
619 .with_logs_handler(instance.fe_instance().clone())
620 .with_prom_handler(
621 frontend_ref.clone(),
622 Some(frontend_ref.clone()),
623 true,
624 PromValidationMode::Strict,
625 )
626 .with_prometheus_handler(frontend_ref)
627 .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
628 .build();
629 let app = http_server.build(http_server.make_app()).unwrap();
630 (app, instance.guard)
631}
632
633pub async fn setup_grpc_server(
634 store_type: StorageType,
635 name: &str,
636) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
637 setup_grpc_server_with(store_type, name, None, None, None).await
638}
639
640pub async fn setup_grpc_server_with_user_provider(
641 store_type: StorageType,
642 name: &str,
643 user_provider: Option<UserProviderRef>,
644) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
645 setup_grpc_server_with(store_type, name, user_provider, None, None).await
646}
647
648pub async fn setup_grpc_server_with(
649 store_type: StorageType,
650 name: &str,
651 user_provider: Option<UserProviderRef>,
652 grpc_config: Option<GrpcServerConfig>,
653 memory_limiter: Option<servers::request_memory_limiter::ServerMemoryLimiter>,
654) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
655 let instance = setup_standalone_instance(name, store_type).await;
656
657 let runtime: Runtime = RuntimeBuilder::default()
658 .worker_threads(2)
659 .thread_name("grpc-handlers")
660 .build()
661 .unwrap();
662
663 let fe_instance_ref = instance.fe_instance().clone();
664
665 let greptime_request_handler = GreptimeRequestHandler::new(
666 fe_instance_ref.clone(),
667 user_provider.clone(),
668 Some(runtime.clone()),
669 FlightCompression::default(),
670 );
671
672 let flight_handler = Arc::new(greptime_request_handler.clone());
673
674 let grpc_config = grpc_config.unwrap_or_default();
675 let mut grpc_builder = GrpcServerBuilder::new(grpc_config.clone(), runtime);
676
677 if let Some(limiter) = memory_limiter {
678 grpc_builder = grpc_builder.with_memory_limiter(limiter);
679 }
680
681 let grpc_builder = grpc_builder
682 .database_handler(greptime_request_handler)
683 .flight_handler(flight_handler)
684 .prometheus_handler(fe_instance_ref.clone(), user_provider.clone())
685 .otel_arrow_handler(OtelArrowServiceHandler::new(fe_instance_ref, user_provider))
686 .with_tls_config(grpc_config.tls)
687 .unwrap();
688
689 let mut grpc_server = grpc_builder.build();
690
691 let fe_grpc_addr = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
692 grpc_server.start(fe_grpc_addr).await.unwrap();
693
694 (instance, Arc::new(grpc_server))
695}
696
697pub async fn setup_mysql_server(
698 store_type: StorageType,
699 name: &str,
700) -> (TestGuard, Arc<Box<dyn Server>>) {
701 setup_mysql_server_with_user_provider(store_type, name, None).await
702}
703
704pub async fn setup_mysql_server_with_slow_query_threshold(
705 store_type: StorageType,
706 name: &str,
707 slow_query_threshold: std::time::Duration,
708) -> (TestGuard, Arc<Box<dyn Server>>) {
709 let plugins = Plugins::new();
710 let instance = setup_standalone_instance_with_plugins_and_slow_query_threshold(
711 name,
712 store_type,
713 plugins,
714 slow_query_threshold,
715 )
716 .await;
717
718 let runtime = RuntimeBuilder::default()
719 .worker_threads(2)
720 .thread_name("mysql-runtime")
721 .build()
722 .unwrap();
723
724 let fe_mysql_addr = format!("127.0.0.1:{}", ports::get_port());
725
726 let fe_instance_ref = instance.fe_instance().clone();
727 let opts = MysqlOptions {
728 addr: fe_mysql_addr.clone(),
729 ..Default::default()
730 };
731 let mut mysql_server = MysqlServer::create_server(
732 runtime,
733 Arc::new(MysqlSpawnRef::new(fe_instance_ref, None)),
734 Arc::new(MysqlSpawnConfig::new(
735 false,
736 Arc::new(
737 ReloadableTlsServerConfig::try_new(opts.tls.clone())
738 .expect("Failed to load certificates and keys"),
739 ),
740 0,
741 opts.reject_no_database.unwrap_or(false),
742 opts.prepared_stmt_cache_size,
743 )),
744 None,
745 );
746
747 mysql_server
748 .start(fe_mysql_addr.parse::<SocketAddr>().unwrap())
749 .await
750 .unwrap();
751
752 (instance.guard, Arc::new(mysql_server))
753}
754
755pub async fn setup_mysql_server_with_user_provider(
756 store_type: StorageType,
757 name: &str,
758 user_provider: Option<UserProviderRef>,
759) -> (TestGuard, Arc<Box<dyn Server>>) {
760 let plugins = Plugins::new();
761 if let Some(user_provider) = user_provider.clone() {
762 plugins.insert::<UserProviderRef>(user_provider.clone());
763 plugins.insert::<PermissionCheckerRef>(DefaultPermissionChecker::arc());
764 }
765
766 let instance = setup_standalone_instance_with_plugins(name, store_type, plugins).await;
767
768 let runtime = RuntimeBuilder::default()
769 .worker_threads(2)
770 .thread_name("mysql-runtime")
771 .build()
772 .unwrap();
773
774 let fe_mysql_addr = format!("127.0.0.1:{}", ports::get_port());
775
776 let fe_instance_ref = instance.fe_instance().clone();
777 let opts = MysqlOptions {
778 addr: fe_mysql_addr.clone(),
779 ..Default::default()
780 };
781 let mut mysql_server = MysqlServer::create_server(
782 runtime,
783 Arc::new(MysqlSpawnRef::new(fe_instance_ref, user_provider)),
784 Arc::new(MysqlSpawnConfig::new(
785 false,
786 Arc::new(
787 ReloadableTlsServerConfig::try_new(opts.tls.clone())
788 .expect("Failed to load certificates and keys"),
789 ),
790 0,
791 opts.reject_no_database.unwrap_or(false),
792 opts.prepared_stmt_cache_size,
793 )),
794 None,
795 );
796
797 mysql_server
798 .start(fe_mysql_addr.parse::<SocketAddr>().unwrap())
799 .await
800 .unwrap();
801
802 (instance.guard, Arc::new(mysql_server))
803}
804
805pub async fn setup_pg_server(
806 store_type: StorageType,
807 name: &str,
808) -> (TestGuard, Arc<Box<dyn Server>>) {
809 setup_pg_server_with_user_provider(store_type, name, None).await
810}
811
812pub async fn setup_pg_server_with_slow_query_threshold(
813 store_type: StorageType,
814 name: &str,
815 slow_query_threshold: std::time::Duration,
816) -> (TestGuard, Arc<Box<dyn Server>>) {
817 let instance =
818 setup_standalone_instance_with_slow_query_threshold(name, store_type, slow_query_threshold)
819 .await;
820
821 let runtime = RuntimeBuilder::default()
822 .worker_threads(2)
823 .thread_name("pg-runtime")
824 .build()
825 .unwrap();
826
827 let fe_pg_addr = format!("127.0.0.1:{}", ports::get_port());
828
829 let fe_instance_ref = instance.fe_instance().clone();
830 let opts = PostgresOptions {
831 addr: fe_pg_addr.clone(),
832 ..Default::default()
833 };
834 let tls_server_config = Arc::new(
835 ReloadableTlsServerConfig::try_new(opts.tls.clone())
836 .expect("Failed to load certificates and keys"),
837 );
838
839 let mut pg_server = Box::new(PostgresServer::new(
840 fe_instance_ref,
841 opts.tls.should_force_tls(),
842 tls_server_config,
843 0,
844 runtime,
845 None,
846 None,
847 ));
848
849 pg_server
850 .start(fe_pg_addr.parse::<SocketAddr>().unwrap())
851 .await
852 .unwrap();
853
854 (instance.guard, Arc::new(pg_server))
855}
856
857pub async fn setup_pg_server_with_user_provider(
858 store_type: StorageType,
859 name: &str,
860 user_provider: Option<UserProviderRef>,
861) -> (TestGuard, Arc<Box<dyn Server>>) {
862 let instance = setup_standalone_instance(name, store_type).await;
863
864 let runtime = RuntimeBuilder::default()
865 .worker_threads(2)
866 .thread_name("pg-runtime")
867 .build()
868 .unwrap();
869
870 let fe_pg_addr = format!("127.0.0.1:{}", ports::get_port());
871
872 let fe_instance_ref = instance.fe_instance().clone();
873 let opts = PostgresOptions {
874 addr: fe_pg_addr.clone(),
875 ..Default::default()
876 };
877 let tls_server_config = Arc::new(
878 ReloadableTlsServerConfig::try_new(opts.tls.clone())
879 .expect("Failed to load certificates and keys"),
880 );
881
882 let mut pg_server = Box::new(PostgresServer::new(
883 fe_instance_ref,
884 opts.tls.should_force_tls(),
885 tls_server_config,
886 0,
887 runtime,
888 user_provider,
889 None,
890 ));
891
892 pg_server
893 .start(fe_pg_addr.parse::<SocketAddr>().unwrap())
894 .await
895 .unwrap();
896
897 (instance.guard, Arc::new(pg_server))
898}
899
900pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
901 let catalog_manager = instance
902 .catalog_manager()
903 .as_any()
904 .downcast_ref::<KvBackendCatalogManager>()
905 .unwrap();
906
907 let table_metadata_manager = catalog_manager.table_metadata_manager_ref();
908 table_metadata_manager
909 .catalog_manager()
910 .create(CatalogNameKey::new("another_catalog"), true)
911 .await
912 .unwrap();
913 table_metadata_manager
914 .schema_manager()
915 .create(
916 SchemaNameKey::new("another_catalog", "another_schema"),
917 None,
918 true,
919 )
920 .await
921 .unwrap();
922}
923
924pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
925 SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
926 .await
927 .remove(0)
928 .unwrap()
929}
930
931pub async fn try_execute_sql(
932 instance: &Arc<Instance>,
933 sql: &str,
934) -> servers::error::Result<Output> {
935 SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
936 .await
937 .remove(0)
938}
939
940pub async fn execute_sql_and_expect(instance: &Arc<Instance>, sql: &str, expected: &str) {
941 let output = execute_sql(instance, sql).await;
942 let output = output.data.pretty_print().await;
943 assert_eq!(output, expected.trim());
944}