From dcfce49cff3ed36d4384f738c06120b508162136 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 27 Jun 2023 13:32:20 +0900 Subject: [PATCH] refactor(datanode): move Instance heartbeat task to Datanode struct (#1832) * refactor(datanode): move Instance heartbeat to Datanode struct * chore: apply suggestions from CR * fix: start heartbeat task after instance starts --- src/datanode/src/datanode.rs | 28 +++++-- src/datanode/src/instance.rs | 123 ++++++++++++++++------------ src/datanode/src/lib.rs | 2 +- src/datanode/src/mock.rs | 12 ++- src/datanode/src/tests/test_util.rs | 17 +++- tests-integration/src/cluster.rs | 28 ++++--- tests-integration/src/test_util.rs | 30 +++++-- tests-integration/src/tests.rs | 5 +- 8 files changed, 163 insertions(+), 82 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5f6c7d2562..59bf68557a 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -14,10 +14,10 @@ //! Datanode configurations -use std::sync::Arc; use std::time::Duration; use common_base::readable_size::ReadableSize; +use common_error::prelude::BoxedError; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; use meta_client::MetaClientOptions; @@ -25,13 +25,15 @@ use secrecy::SecretString; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::Mode; +use snafu::ResultExt; use storage::config::{ EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS, DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE, }; use storage::scheduler::SchedulerConfig; -use crate::error::Result; +use crate::error::{Result, ShutdownInstanceSnafu}; +use crate::heartbeat::HeartbeatTask; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; @@ -340,6 +342,7 @@ pub struct DatanodeOptions { pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, + pub heartbeat_interval_millis: u64, pub http_opts: HttpOptions, pub meta_client_options: Option, pub wal: WalConfig, @@ -363,6 +366,7 @@ impl Default for DatanodeOptions { storage: StorageConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), + heartbeat_interval_millis: 5000, } } } @@ -378,11 +382,12 @@ pub struct Datanode { opts: DatanodeOptions, services: Option, instance: InstanceRef, + heartbeat_task: Option, } impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { - let instance = Arc::new(Instance::with_opts(&opts).await?); + let (instance, heartbeat_task) = Instance::with_opts(&opts).await?; let services = match opts.mode { Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?), Mode::Standalone => None, @@ -391,6 +396,7 @@ impl Datanode { opts, services, instance, + heartbeat_task, }) } @@ -402,7 +408,11 @@ impl Datanode { /// Start only the internal component of datanode. pub async fn start_instance(&mut self) -> Result<()> { - self.instance.start().await + let _ = self.instance.start().await; + if let Some(task) = &self.heartbeat_task { + task.start().await?; + } + Ok(()) } /// Start services of datanode. This method call will block until services are shutdown. @@ -419,7 +429,15 @@ impl Datanode { } pub async fn shutdown_instance(&self) -> Result<()> { - self.instance.shutdown().await + if let Some(heartbeat_task) = &self.heartbeat_task { + heartbeat_task + .close() + .await + .map_err(BoxedError::new) + .context(ShutdownInstanceSnafu)?; + } + let _ = self.instance.shutdown().await; + Ok(()) } async fn shutdown_services(&self) -> Result<()> { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 194cc047c7..50d9cce35c 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -48,7 +48,7 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::scheduler::{LocalScheduler, SchedulerConfig}; use storage::EngineImpl; use store_api::logstore::LogStore; -use table::engine::manager::MemoryTableEngineManager; +use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; use table::engine::{TableEngine, TableEngineProcedureRef}; use table::requests::FlushTableRequest; use table::table::numbers::NumbersTable; @@ -78,14 +78,13 @@ pub struct Instance { pub(crate) sql_handler: SqlHandler, pub(crate) catalog_manager: CatalogManagerRef, pub(crate) table_id_provider: Option, - pub(crate) heartbeat_task: Option, procedure_manager: ProcedureManagerRef, } pub type InstanceRef = Arc; impl Instance { - pub async fn with_opts(opts: &DatanodeOptions) -> Result { + pub async fn with_opts(opts: &DatanodeOptions) -> Result<(InstanceRef, Option)> { let meta_client = match opts.mode { Mode::Standalone => None, Mode::Distributed => { @@ -105,11 +104,57 @@ impl Instance { Self::new(opts, meta_client, compaction_scheduler).await } + fn build_heartbeat_task( + opts: &DatanodeOptions, + meta_client: Option>, + catalog_manager: CatalogManagerRef, + engine_manager: TableEngineManagerRef, + region_alive_keepers: Option>, + ) -> Result> { + Ok(match opts.mode { + Mode::Standalone => None, + Mode::Distributed => { + let node_id = opts.node_id.context(MissingNodeIdSnafu)?; + let meta_client = meta_client.context(IncorrectInternalStateSnafu { + state: "meta client is not provided when building heartbeat task", + })?; + let region_alive_keepers = + region_alive_keepers.context(IncorrectInternalStateSnafu { + state: "region_alive_keepers is not provided when building heartbeat task", + })?; + let handlers_executor = HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler::default()), + Arc::new(OpenRegionHandler::new( + catalog_manager.clone(), + engine_manager.clone(), + region_alive_keepers.clone(), + )), + Arc::new(CloseRegionHandler::new( + catalog_manager.clone(), + engine_manager, + region_alive_keepers.clone(), + )), + region_alive_keepers.clone(), + ]); + + Some(HeartbeatTask::new( + node_id, + opts, + meta_client, + catalog_manager, + Arc::new(handlers_executor), + opts.heartbeat_interval_millis, + region_alive_keepers, + )) + } + }) + } + pub(crate) async fn new( opts: &DatanodeOptions, meta_client: Option>, compaction_scheduler: CompactionSchedulerRef, - ) -> Result { + ) -> Result<(InstanceRef, Option)> { let object_store = store::new_object_store(&opts.storage.store).await?; let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?); @@ -151,7 +196,7 @@ impl Instance { ); // create remote catalog manager - let (catalog_manager, table_id_provider, heartbeat_task) = match opts.mode { + let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode { Mode::Standalone => { if opts.enable_memory_catalog { let catalog = Arc::new(catalog::local::MemoryCatalogManager::default()); @@ -189,17 +234,15 @@ impl Instance { } Mode::Distributed => { - let meta_client = meta_client.context(IncorrectInternalStateSnafu { + let meta_client = meta_client.clone().context(IncorrectInternalStateSnafu { state: "meta client is not provided when creating distributed Datanode", })?; - let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - - let heartbeat_interval_millis = 5000; + let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client)); let region_alive_keepers = Arc::new(RegionAliveKeepers::new( engine_manager.clone(), - heartbeat_interval_millis, + opts.heartbeat_interval_millis, )); let catalog_manager = Arc::new(RemoteCatalogManager::new( @@ -209,32 +252,11 @@ impl Instance { region_alive_keepers.clone(), )); - let handlers_executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler::default()), - Arc::new(OpenRegionHandler::new( - catalog_manager.clone(), - engine_manager.clone(), - region_alive_keepers.clone(), - )), - Arc::new(CloseRegionHandler::new( - catalog_manager.clone(), - engine_manager.clone(), - region_alive_keepers.clone(), - )), - region_alive_keepers.clone(), - ]); - - let heartbeat_task = Some(HeartbeatTask::new( - opts.node_id.context(MissingNodeIdSnafu)?, - opts, - meta_client, - catalog_manager.clone(), - Arc::new(handlers_executor), - heartbeat_interval_millis, - region_alive_keepers, - )); - - (catalog_manager as CatalogManagerRef, None, heartbeat_task) + ( + catalog_manager as CatalogManagerRef, + None, + Some(region_alive_keepers), + ) } }; @@ -258,18 +280,27 @@ impl Instance { &*procedure_manager, ); - Ok(Self { + let instance = Arc::new(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new( - engine_manager, + engine_manager.clone(), catalog_manager.clone(), procedure_manager.clone(), ), - catalog_manager, - heartbeat_task, + catalog_manager: catalog_manager.clone(), table_id_provider, procedure_manager, - }) + }); + + let heartbeat_task = Instance::build_heartbeat_task( + opts, + meta_client, + catalog_manager, + engine_manager, + region_alive_keepers, + )?; + + Ok((instance, heartbeat_task)) } pub async fn start(&self) -> Result<()> { @@ -277,9 +308,6 @@ impl Instance { .start() .await .context(NewCatalogSnafu)?; - if let Some(task) = &self.heartbeat_task { - task.start().await?; - } // Recover procedures after the catalog manager is started, so we can // ensure we can access all tables from the catalog manager. @@ -298,13 +326,6 @@ impl Instance { .stop() .await .context(StopProcedureManagerSnafu)?; - if let Some(heartbeat_task) = &self.heartbeat_task { - heartbeat_task - .close() - .await - .map_err(BoxedError::new) - .context(ShutdownInstanceSnafu)?; - } self.flush_tables().await?; diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 46129fd6d3..26ebb69295 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -17,7 +17,7 @@ pub mod datanode; pub mod error; -mod heartbeat; +pub mod heartbeat; pub mod instance; pub mod metrics; mod mock; diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 43153e8e15..36ebac5187 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -21,15 +21,21 @@ use storage::compaction::noop::NoopCompactionScheduler; use crate::datanode::DatanodeOptions; use crate::error::Result; -use crate::instance::Instance; +use crate::heartbeat::HeartbeatTask; +use crate::instance::{Instance, InstanceRef}; impl Instance { - pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result { + pub async fn with_mock_meta_client( + opts: &DatanodeOptions, + ) -> Result<(InstanceRef, Option)> { let mock_info = meta_srv::mocks::mock_with_memstore().await; Self::with_mock_meta_server(opts, mock_info).await } - pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { + pub async fn with_mock_meta_server( + opts: &DatanodeOptions, + meta_srv: MockInfo, + ) -> Result<(InstanceRef, Option)> { let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); Instance::new(opts, Some(meta_client), compaction_scheduler).await diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index f82001432a..9be758439e 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -29,10 +29,12 @@ use crate::datanode::{ DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig, }; use crate::error::{CreateTableSnafu, Result}; -use crate::instance::Instance; +use crate::heartbeat::HeartbeatTask; +use crate::instance::{Instance, InstanceRef}; pub(crate) struct MockInstance { - instance: Instance, + instance: InstanceRef, + _heartbeat: Option, _guard: TestGuard, } @@ -40,10 +42,17 @@ impl MockInstance { pub(crate) async fn new(name: &str) -> Self { let (opts, _guard) = create_tmp_dir_and_datanode_opts(name); - let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); + if let Some(task) = heartbeat.as_ref() { + task.start().await.unwrap(); + } - MockInstance { instance, _guard } + MockInstance { + instance, + _guard, + _heartbeat: heartbeat, + } } pub(crate) fn inner(&self) -> &Instance { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 67c46d7f35..6900fa5f0f 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -27,6 +27,7 @@ use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::heartbeat::HeartbeatTask; use datanode::instance::Instance as DatanodeInstance; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; @@ -49,6 +50,7 @@ pub struct GreptimeDbCluster { _wal_guards: Vec, pub datanode_instances: HashMap>, + pub datanode_heartbeat_tasks: HashMap>, pub kv_store: KvStoreRef, pub meta_srv: MetaSrv, pub frontend: Arc, @@ -86,7 +88,7 @@ impl GreptimeDbClusterBuilder { let meta_srv = self.build_metasrv().await; - let (datanode_instances, storage_guards, wal_guards) = + let (datanode_instances, heartbeat_tasks, storage_guards, wal_guards) = self.build_datanodes(meta_srv.clone(), datanodes).await; let datanode_clients = build_datanode_clients(&datanode_instances, datanodes).await; @@ -103,6 +105,7 @@ impl GreptimeDbClusterBuilder { storage_guards, _wal_guards: wal_guards, datanode_instances, + datanode_heartbeat_tasks: heartbeat_tasks, kv_store: self.kv_store.clone(), meta_srv: meta_srv.meta_srv, frontend, @@ -119,10 +122,12 @@ impl GreptimeDbClusterBuilder { datanodes: u32, ) -> ( HashMap>, + HashMap>, Vec, Vec, ) { let mut instances = HashMap::with_capacity(datanodes as usize); + let mut heartbeat_tasks = HashMap::with_capacity(datanodes as usize); let mut storage_guards = Vec::with_capacity(datanodes as usize); let mut wal_guards = Vec::with_capacity(datanodes as usize); @@ -151,9 +156,10 @@ impl GreptimeDbClusterBuilder { let dn_instance = self.create_datanode(&opts, meta_srv.clone()).await; - instances.insert(datanode_id, dn_instance.clone()); + instances.insert(datanode_id, dn_instance.0.clone()); + heartbeat_tasks.insert(datanode_id, dn_instance.1); } - (instances, storage_guards, wal_guards) + (instances, heartbeat_tasks, storage_guards, wal_guards) } async fn wait_datanodes_alive(&self, expected_datanodes: u32) { @@ -175,14 +181,14 @@ impl GreptimeDbClusterBuilder { &self, opts: &DatanodeOptions, meta_srv: MockInfo, - ) -> Arc { - let instance = Arc::new( - DatanodeInstance::with_mock_meta_server(opts, meta_srv) - .await - .unwrap(), - ); + ) -> (Arc, Option) { + let (instance, heartbeat) = DatanodeInstance::with_mock_meta_server(opts, meta_srv) + .await + .unwrap(); instance.start().await.unwrap(); - + if let Some(heartbeat) = heartbeat.as_ref() { + heartbeat.start().await.unwrap(); + } // create another catalog and schema for testing instance .catalog_manager() @@ -192,7 +198,7 @@ impl GreptimeDbClusterBuilder { .create_catalog_and_schema("another_catalog", "another_schema") .await .unwrap(); - instance + (instance, heartbeat) } async fn build_frontend( diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 975105fa4b..a6d6812989 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -321,7 +321,7 @@ pub async fn create_test_table( pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); create_test_table( instance.catalog_manager(), instance.sql_handler(), @@ -334,6 +334,9 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router .await .unwrap(); instance.start().await.unwrap(); + if let Some(heartbeat) = heartbeat { + heartbeat.start().await.unwrap(); + } let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), @@ -354,11 +357,14 @@ pub async fn setup_test_http_app_with_frontend( name: &str, ) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); let frontend = FeInstance::try_new_standalone(instance.clone()) .await .unwrap(); instance.start().await.unwrap(); + if let Some(heartbeat) = heartbeat { + heartbeat.start().await.unwrap(); + } create_test_table( frontend.catalog_manager(), instance.sql_handler(), @@ -416,11 +422,14 @@ pub async fn setup_test_prom_app_with_frontend( ) -> (Router, TestGuard) { std::env::set_var("TZ", "UTC"); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); let frontend = FeInstance::try_new_standalone(instance.clone()) .await .unwrap(); instance.start().await.unwrap(); + if let Some(heartbeat) = heartbeat { + heartbeat.start().await.unwrap(); + } create_test_table( frontend.catalog_manager(), @@ -470,7 +479,7 @@ pub async fn setup_grpc_server( common_telemetry::init_default_ut_logging(); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); let runtime = Arc::new( RuntimeBuilder::default() @@ -484,6 +493,9 @@ pub async fn setup_grpc_server( .await .unwrap(); instance.start().await.unwrap(); + if let Some(heartbeat) = heartbeat { + heartbeat.start().await.unwrap(); + } let fe_instance_ref = Arc::new(fe_instance); let fe_grpc_server = Arc::new(GrpcServer::new( ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()), @@ -522,7 +534,7 @@ pub async fn setup_mysql_server( common_telemetry::init_default_ut_logging(); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); let runtime = Arc::new( RuntimeBuilder::default() @@ -538,6 +550,9 @@ pub async fn setup_mysql_server( .await .unwrap(); instance.start().await.unwrap(); + if let Some(heartbeat) = heartbeat { + heartbeat.start().await.unwrap(); + } let fe_instance_ref = Arc::new(fe_instance); let opts = MysqlOptions { addr: fe_mysql_addr.clone(), @@ -575,7 +590,7 @@ pub async fn setup_pg_server( common_telemetry::init_default_ut_logging(); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); let runtime = Arc::new( RuntimeBuilder::default() @@ -591,6 +606,9 @@ pub async fn setup_pg_server( .await .unwrap(); instance.start().await.unwrap(); + if let Some(heartbeat) = heartbeat { + heartbeat.start().await.unwrap(); + } let fe_instance_ref = Arc::new(fe_instance); let opts = PostgresOptions { addr: fe_pg_addr.clone(), diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 24b0843559..2557dc8201 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -65,7 +65,7 @@ impl MockStandaloneInstance { pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name); - let dn_instance = Arc::new(DatanodeInstance::with_opts(&opts).await.unwrap()); + let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts).await.unwrap(); let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) .await @@ -87,6 +87,9 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon .unwrap(); dn_instance.start().await.unwrap(); + if let Some(heartbeat) = heartbeat { + heartbeat.start().await.unwrap(); + }; MockStandaloneInstance { instance: Arc::new(frontend_instance), _guard: guard,