diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index bd0ca573c8..42f1e0a71e 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -193,7 +193,6 @@ async fn build_frontend( datanode_instance: InstanceRef, ) -> Result> { let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); - frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); frontend_instance.set_script_handler(datanode_instance); Ok(Frontend::new(fe_opts, frontend_instance, plugins)) } diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index f8a0460012..cff79afad1 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -24,7 +24,7 @@ use query::QueryEngineFactory; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::metadata::TableId; -use table::table::{TableIdProvider, TableIdProviderRef}; +use table::table::TableIdProvider; use crate::datanode::DatanodeOptions; use crate::error::Result; @@ -34,57 +34,6 @@ use crate::script::ScriptExecutor; use crate::sql::SqlHandler; impl Instance { - // This method is used in other crate's testing codes, so move it out of "cfg(test)". - // TODO(LFC): Delete it when callers no longer need it. - pub async fn new_mock() -> Result { - use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; - let mock_info = meta_srv::mocks::mock_with_memstore().await; - let meta_client = Arc::new(mock_meta_client(mock_info, 0).await); - let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; - - let logstore = Arc::new(create_local_file_log_store(dir.path().to_str().unwrap()).await?); - let mock_engine = Arc::new(MockMitoEngine::new( - TableEngineConfig::default(), - MockEngine::default(), - object_store, - )); - - let catalog_manager = Arc::new( - catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone()) - .await - .unwrap(), - ); - - let factory = QueryEngineFactory::new(catalog_manager.clone()); - let query_engine = factory.query_engine(); - - let sql_handler = SqlHandler::new( - mock_engine.clone(), - catalog_manager.clone(), - query_engine.clone(), - ); - let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()) - .await - .unwrap(); - - let heartbeat_task = Some(HeartbeatTask::new( - 0, - "127.0.0.1:3302".to_string(), - meta_client, - )); - - let table_id_provider = Some(catalog_manager.clone() as TableIdProviderRef); - Ok(Self { - query_engine, - sql_handler, - catalog_manager, - script_executor, - heartbeat_task, - table_id_provider, - logstore, - }) - } - pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result { let mock_info = meta_srv::mocks::mock_with_memstore().await; Self::with_mock_meta_server(opts, mock_info).await diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index b93759b3c7..1b01d05eae 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -173,10 +173,10 @@ async fn assert_query_result(instance: &Instance, sql: &str, ts: i64, host: &str } } -async fn setup_test_instance() -> Instance { +async fn setup_test_instance(test_name: &str) -> Instance { common_telemetry::init_default_ut_logging(); - let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_insert"); + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(test_name); let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); @@ -193,7 +193,7 @@ async fn setup_test_instance() -> Instance { #[tokio::test(flavor = "multi_thread")] async fn test_execute_insert() { - let instance = setup_test_instance().await; + let instance = setup_test_instance("test_execute_insert").await; let output = execute_sql( &instance, r#"insert into demo(host, cpu, memory, ts) values @@ -409,18 +409,10 @@ async fn check_output_stream(output: Output, expected: Vec<&str>) { assert_eq!(pretty_print, expected); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_alter_table() { - let instance = Instance::new_mock().await.unwrap(); - instance.start().await.unwrap(); + let instance = setup_test_instance("test_alter_table").await; - test_util::create_test_table( - instance.catalog_manager(), - instance.sql_handler(), - ConcreteDataType::timestamp_millis_datatype(), - ) - .await - .unwrap(); // make sure table insertion is ok before altering table execute_sql( &instance, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1616825157..64b2bac22a 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -56,13 +56,13 @@ use sql::parser::ParserContext; use sql::statements::create::Partitions; use sql::statements::insert::Insert; use sql::statements::statement::Statement; +use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, AlterTableOnInsertionSnafu, CatalogNotFoundSnafu, CatalogSnafu, CreateDatabaseSnafu, - CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result, - SchemaNotFoundSnafu, + self, AlterTableOnInsertionSnafu, CatalogSnafu, CreateDatabaseSnafu, CreateTableSnafu, + FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result, }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; @@ -90,8 +90,7 @@ pub type FrontendInstanceRef = Arc; #[derive(Clone)] pub struct Instance { - /// catalog manager is None in standalone mode, datanode will keep their own - catalog_manager: Option, + catalog_manager: CatalogManagerRef, /// Script handler is None in distributed mode, only works on standalone mode. script_handler: Option, create_expr_factory: CreateExprFactoryRef, @@ -128,7 +127,7 @@ impl Instance { let dist_instance_ref = Arc::new(dist_instance.clone()); Ok(Instance { - catalog_manager: Some(catalog_manager), + catalog_manager, script_handler: None, create_expr_factory: Arc::new(DefaultCreateExprFactory), mode: Mode::Distributed, @@ -171,7 +170,7 @@ impl Instance { pub fn new_standalone(dn_instance: DnInstanceRef) -> Self { Instance { - catalog_manager: None, + catalog_manager: dn_instance.catalog_manager().clone(), script_handler: None, create_expr_factory: Arc::new(DefaultCreateExprFactory), mode: Mode::Standalone, @@ -182,18 +181,10 @@ impl Instance { } } - pub fn catalog_manager(&self) -> &Option { + pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } - pub fn set_catalog_manager(&mut self, catalog_manager: CatalogManagerRef) { - debug_assert!( - self.catalog_manager.is_none(), - "Catalog manager can be set only once!" - ); - self.catalog_manager = Some(catalog_manager); - } - pub fn set_script_handler(&mut self, handler: ScriptHandlerRef) { debug_assert!( self.script_handler.is_none(), @@ -293,21 +284,7 @@ impl Instance { table_name: &str, columns: &[Column], ) -> Result<()> { - match self - .catalog_manager - .as_ref() - .expect("catalog manager cannot be None") - .catalog(catalog_name) - .context(CatalogSnafu)? - .context(CatalogNotFoundSnafu { catalog_name })? - .schema(schema_name) - .context(CatalogSnafu)? - .context(SchemaNotFoundSnafu { - schema_info: schema_name, - })? - .table(table_name) - .context(CatalogSnafu)? - { + match self.find_table(catalog_name, schema_name, table_name)? { None => { info!( "Table {}.{}.{} does not exist, try create table", @@ -403,8 +380,6 @@ impl Instance { fn get_catalog(&self, catalog_name: &str) -> Result { self.catalog_manager - .as_ref() - .context(error::CatalogManagerSnafu)? .catalog(catalog_name) .context(error::CatalogSnafu)? .context(error::CatalogNotFoundSnafu { catalog_name }) @@ -419,6 +394,12 @@ impl Instance { }) } + fn find_table(&self, catalog: &str, schema: &str, table: &str) -> Result> { + self.catalog_manager + .table(catalog, schema, table) + .context(CatalogSnafu) + } + async fn sql_dist_insert(&self, insert: Box) -> Result { let (catalog, schema, table) = insert.full_table_name().context(error::ParseSqlSnafu)?; @@ -458,23 +439,17 @@ impl Instance { } fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { - let catalog_manager = &self.catalog_manager; - if let Some(catalog_manager) = catalog_manager { - ensure!( - catalog_manager - .schema(DEFAULT_CATALOG_NAME, &db) - .context(error::CatalogSnafu)? - .is_some(), - error::SchemaNotFoundSnafu { schema_info: &db } - ); + ensure!( + self.catalog_manager + .schema(DEFAULT_CATALOG_NAME, &db) + .context(error::CatalogSnafu)? + .is_some(), + error::SchemaNotFoundSnafu { schema_info: &db } + ); - query_ctx.set_current_schema(&db); + query_ctx.set_current_schema(&db); - Ok(Output::RecordBatches(RecordBatches::empty())) - } else { - // TODO(LFC): Handle "use" stmt here. - unimplemented!() - } + Ok(Output::RecordBatches(RecordBatches::empty())) } } @@ -679,11 +654,11 @@ mod tests { use super::*; use crate::tests; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_execute_sql() { let query_ctx = Arc::new(QueryContext::new()); - let instance = tests::create_frontend_instance().await; + let (instance, _guard) = tests::create_frontend_instance("test_execute_sql").await; let sql = r#"CREATE TABLE demo( host STRING, @@ -761,9 +736,9 @@ mod tests { }; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_execute_grpc() { - let instance = tests::create_frontend_instance().await; + let (instance, _guard) = tests::create_frontend_instance("test_execute_grpc").await; // testing data: let expected_host_col = Column { diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index e2c0c91ee0..842a45240e 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -70,9 +70,9 @@ mod tests { use super::*; use crate::tests; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_exec() { - let instance = tests::create_frontend_instance().await; + let (instance, _guard) = tests::create_frontend_instance("test_exec").await; instance .exec( &DataPoint::try_create( @@ -88,9 +88,10 @@ mod tests { .unwrap(); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_insert_opentsdb_metric() { - let instance = tests::create_frontend_instance().await; + let (instance, _guard) = + tests::create_frontend_instance("test_insert_opentsdb_metric").await; let data_point1 = DataPoint::new( "my_metric_1".to_string(), @@ -124,7 +125,10 @@ mod tests { assert!(result.is_ok()); let output = instance - .do_query("select * from my_metric_1", Arc::new(QueryContext::new())) + .do_query( + "select * from my_metric_1 order by greptime_timestamp", + Arc::new(QueryContext::new()), + ) .await .unwrap(); match output { diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index b1ad7ad53c..1257d186c8 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -182,10 +182,11 @@ mod tests { use super::*; use crate::tests; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_prometheus_remote_write_and_read() { common_telemetry::init_default_ut_logging(); - let instance = tests::create_frontend_instance().await; + let (instance, _guard) = + tests::create_frontend_instance("test_prometheus_remote_write_and_read").await; let write_request = WriteRequest { timeseries: prometheus::mock_timeseries(), diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 4cb1360fea..37292458f3 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -29,6 +29,7 @@ use meta_srv::mocks::MockInfo; use meta_srv::service::store::kv::KvStoreRef; use meta_srv::service::store::memory::MemStore; use servers::grpc::GrpcServer; +use servers::Mode; use tempdir::TempDir; use tonic::transport::Server; use tower::service_fn; @@ -39,21 +40,42 @@ use crate::instance::distributed::DistInstance; use crate::instance::Instance; use crate::table::route::TableRoutes; -async fn create_datanode_instance() -> Arc { - // TODO(LFC) Use real Mito engine when we can alter its region schema, - // and delete the `new_mock` method. - let instance = Arc::new(DatanodeInstance::new_mock().await.unwrap()); - instance.start().await.unwrap(); - instance +/// Guard against the `TempDir`s that used in unit tests. +/// (The `TempDir` will be deleted once it goes out of scope.) +pub struct TestGuard { + _wal_tmp_dir: TempDir, + _data_tmp_dir: TempDir, } -pub(crate) async fn create_frontend_instance() -> Arc { - let datanode_instance: Arc = create_datanode_instance().await; - let dn_catalog_manager = datanode_instance.catalog_manager().clone(); +pub(crate) async fn create_frontend_instance(test_name: &str) -> (Arc, TestGuard) { + let (opts, guard) = create_tmp_dir_and_datanode_opts(test_name); + let datanode_instance = DatanodeInstance::with_mock_meta_client(&opts) + .await + .unwrap(); + datanode_instance.start().await.unwrap(); - let mut frontend_instance = Instance::new_standalone(datanode_instance); - frontend_instance.set_catalog_manager(dn_catalog_manager); - Arc::new(frontend_instance) + let frontend_instance = Instance::new_standalone(Arc::new(datanode_instance)); + (Arc::new(frontend_instance), guard) +} + +fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) { + let wal_tmp_dir = TempDir::new(&format!("gt_wal_{}", name)).unwrap(); + let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap(); + let opts = DatanodeOptions { + wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + storage: ObjectStoreConfig::File { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }, + mode: Mode::Standalone, + ..Default::default() + }; + ( + opts, + TestGuard { + _wal_tmp_dir: wal_tmp_dir, + _data_tmp_dir: data_tmp_dir, + }, + ) } pub(crate) async fn create_datanode_client( diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index 8b994342db..d2b8844992 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::error; +use common_telemetry::{debug, warn}; use futures::StreamExt; use pgwire::tokio::process_socket; use tokio; @@ -79,6 +80,11 @@ impl PostgresServer { match tcp_stream { Err(error) => error!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt. Ok(io_stream) => { + match io_stream.peer_addr() { + Ok(addr) => debug!("PostgreSQL client coming from {}", addr), + Err(e) => warn!("Failed to get PostgreSQL client addr, err: {}", e), + } + io_runtime.spawn(process_socket( io_stream, tls_acceptor.clone(), @@ -102,6 +108,7 @@ impl Server for PostgresServer { async fn start(&self, listening: SocketAddr) -> Result { let (stream, addr) = self.base_server.bind(listening).await?; + debug!("Starting PostgreSQL with TLS option: {:?}", self.tls); let tls_acceptor = self .tls .setup()? diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 501879023b..958bcf2fb8 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -214,7 +214,6 @@ pub async fn create_test_table( async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance { let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); - frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); frontend_instance.set_script_handler(datanode_instance); frontend_instance } @@ -243,7 +242,7 @@ pub async fn setup_test_app_with_frontend( let mut frontend = build_frontend_instance(instance.clone()).await; instance.start().await.unwrap(); create_test_table( - frontend.catalog_manager().as_ref().unwrap(), + frontend.catalog_manager(), instance.sql_handler(), ConcreteDataType::timestamp_millis_datatype(), ) @@ -276,9 +275,7 @@ pub async fn setup_grpc_server( let fe_grpc_addr = format!("127.0.0.1:{}", get_port()); - let mut fe_instance = frontend::instance::Instance::new_standalone(instance.clone()); - fe_instance.set_catalog_manager(instance.catalog_manager().clone()); - + let fe_instance = frontend::instance::Instance::new_standalone(instance.clone()); let fe_instance_ref = Arc::new(fe_instance); let fe_grpc_server = Arc::new(GrpcServer::new( fe_instance_ref.clone(),