refactor(datanode): move Instance heartbeat task to Datanode struct (#1832)

* refactor(datanode): move Instance heartbeat to Datanode struct

* chore: apply suggestions from CR

* fix: start heartbeat task after instance starts
This commit is contained in:
Weny Xu
2023-06-27 13:32:20 +09:00
committed by GitHub
parent 78b07996b1
commit dcfce49cff
8 changed files with 163 additions and 82 deletions

View File

@@ -14,10 +14,10 @@
//! Datanode configurations //! Datanode configurations
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use common_base::readable_size::ReadableSize; use common_base::readable_size::ReadableSize;
use common_error::prelude::BoxedError;
use common_telemetry::info; use common_telemetry::info;
use common_telemetry::logging::LoggingOptions; use common_telemetry::logging::LoggingOptions;
use meta_client::MetaClientOptions; use meta_client::MetaClientOptions;
@@ -25,13 +25,15 @@ use secrecy::SecretString;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use servers::http::HttpOptions; use servers::http::HttpOptions;
use servers::Mode; use servers::Mode;
use snafu::ResultExt;
use storage::config::{ use storage::config::{
EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS, EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS,
DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE, DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE,
}; };
use storage::scheduler::SchedulerConfig; use storage::scheduler::SchedulerConfig;
use crate::error::Result; use crate::error::{Result, ShutdownInstanceSnafu};
use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef}; use crate::instance::{Instance, InstanceRef};
use crate::server::Services; use crate::server::Services;
@@ -340,6 +342,7 @@ pub struct DatanodeOptions {
pub rpc_addr: String, pub rpc_addr: String,
pub rpc_hostname: Option<String>, pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize, pub rpc_runtime_size: usize,
pub heartbeat_interval_millis: u64,
pub http_opts: HttpOptions, pub http_opts: HttpOptions,
pub meta_client_options: Option<MetaClientOptions>, pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig, pub wal: WalConfig,
@@ -363,6 +366,7 @@ impl Default for DatanodeOptions {
storage: StorageConfig::default(), storage: StorageConfig::default(),
procedure: ProcedureConfig::default(), procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(), logging: LoggingOptions::default(),
heartbeat_interval_millis: 5000,
} }
} }
} }
@@ -378,11 +382,12 @@ pub struct Datanode {
opts: DatanodeOptions, opts: DatanodeOptions,
services: Option<Services>, services: Option<Services>,
instance: InstanceRef, instance: InstanceRef,
heartbeat_task: Option<HeartbeatTask>,
} }
impl Datanode { impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> { pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::with_opts(&opts).await?); let (instance, heartbeat_task) = Instance::with_opts(&opts).await?;
let services = match opts.mode { let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?), Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None, Mode::Standalone => None,
@@ -391,6 +396,7 @@ impl Datanode {
opts, opts,
services, services,
instance, instance,
heartbeat_task,
}) })
} }
@@ -402,7 +408,11 @@ impl Datanode {
/// Start only the internal component of datanode. /// Start only the internal component of datanode.
pub async fn start_instance(&mut self) -> Result<()> { pub async fn start_instance(&mut self) -> Result<()> {
self.instance.start().await let _ = self.instance.start().await;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
Ok(())
} }
/// Start services of datanode. This method call will block until services are shutdown. /// Start services of datanode. This method call will block until services are shutdown.
@@ -419,7 +429,15 @@ impl Datanode {
} }
pub async fn shutdown_instance(&self) -> Result<()> { pub async fn shutdown_instance(&self) -> Result<()> {
self.instance.shutdown().await if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
.close()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}
let _ = self.instance.shutdown().await;
Ok(())
} }
async fn shutdown_services(&self) -> Result<()> { async fn shutdown_services(&self) -> Result<()> {

View File

@@ -48,7 +48,7 @@ use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig}; use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl; use storage::EngineImpl;
use store_api::logstore::LogStore; use store_api::logstore::LogStore;
use table::engine::manager::MemoryTableEngineManager; use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::{TableEngine, TableEngineProcedureRef}; use table::engine::{TableEngine, TableEngineProcedureRef};
use table::requests::FlushTableRequest; use table::requests::FlushTableRequest;
use table::table::numbers::NumbersTable; use table::table::numbers::NumbersTable;
@@ -78,14 +78,13 @@ pub struct Instance {
pub(crate) sql_handler: SqlHandler, pub(crate) sql_handler: SqlHandler,
pub(crate) catalog_manager: CatalogManagerRef, pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) table_id_provider: Option<TableIdProviderRef>, pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
procedure_manager: ProcedureManagerRef, procedure_manager: ProcedureManagerRef,
} }
pub type InstanceRef = Arc<Instance>; pub type InstanceRef = Arc<Instance>;
impl Instance { impl Instance {
pub async fn with_opts(opts: &DatanodeOptions) -> Result<Self> { pub async fn with_opts(opts: &DatanodeOptions) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = match opts.mode { let meta_client = match opts.mode {
Mode::Standalone => None, Mode::Standalone => None,
Mode::Distributed => { Mode::Distributed => {
@@ -105,11 +104,57 @@ impl Instance {
Self::new(opts, meta_client, compaction_scheduler).await Self::new(opts, meta_client, compaction_scheduler).await
} }
fn build_heartbeat_task(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
catalog_manager: CatalogManagerRef,
engine_manager: TableEngineManagerRef,
region_alive_keepers: Option<Arc<RegionAliveKeepers>>,
) -> Result<Option<HeartbeatTask>> {
Ok(match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
let node_id = opts.node_id.context(MissingNodeIdSnafu)?;
let meta_client = meta_client.context(IncorrectInternalStateSnafu {
state: "meta client is not provided when building heartbeat task",
})?;
let region_alive_keepers =
region_alive_keepers.context(IncorrectInternalStateSnafu {
state: "region_alive_keepers is not provided when building heartbeat task",
})?;
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager.clone(),
engine_manager,
region_alive_keepers.clone(),
)),
region_alive_keepers.clone(),
]);
Some(HeartbeatTask::new(
node_id,
opts,
meta_client,
catalog_manager,
Arc::new(handlers_executor),
opts.heartbeat_interval_millis,
region_alive_keepers,
))
}
})
}
pub(crate) async fn new( pub(crate) async fn new(
opts: &DatanodeOptions, opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>, meta_client: Option<Arc<MetaClient>>,
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>, compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
) -> Result<Self> { ) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let object_store = store::new_object_store(&opts.storage.store).await?; let object_store = store::new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?); let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?);
@@ -151,7 +196,7 @@ impl Instance {
); );
// create remote catalog manager // create remote catalog manager
let (catalog_manager, table_id_provider, heartbeat_task) = match opts.mode { let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode {
Mode::Standalone => { Mode::Standalone => {
if opts.enable_memory_catalog { if opts.enable_memory_catalog {
let catalog = Arc::new(catalog::local::MemoryCatalogManager::default()); let catalog = Arc::new(catalog::local::MemoryCatalogManager::default());
@@ -189,17 +234,15 @@ impl Instance {
} }
Mode::Distributed => { Mode::Distributed => {
let meta_client = meta_client.context(IncorrectInternalStateSnafu { let meta_client = meta_client.clone().context(IncorrectInternalStateSnafu {
state: "meta client is not provided when creating distributed Datanode", state: "meta client is not provided when creating distributed Datanode",
})?; })?;
let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client));
let heartbeat_interval_millis = 5000;
let region_alive_keepers = Arc::new(RegionAliveKeepers::new( let region_alive_keepers = Arc::new(RegionAliveKeepers::new(
engine_manager.clone(), engine_manager.clone(),
heartbeat_interval_millis, opts.heartbeat_interval_millis,
)); ));
let catalog_manager = Arc::new(RemoteCatalogManager::new( let catalog_manager = Arc::new(RemoteCatalogManager::new(
@@ -209,32 +252,11 @@ impl Instance {
region_alive_keepers.clone(), region_alive_keepers.clone(),
)); ));
let handlers_executor = HandlerGroupExecutor::new(vec![ (
Arc::new(ParseMailboxMessageHandler::default()), catalog_manager as CatalogManagerRef,
Arc::new(OpenRegionHandler::new( None,
catalog_manager.clone(), Some(region_alive_keepers),
engine_manager.clone(), )
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers.clone(),
)),
region_alive_keepers.clone(),
]);
let heartbeat_task = Some(HeartbeatTask::new(
opts.node_id.context(MissingNodeIdSnafu)?,
opts,
meta_client,
catalog_manager.clone(),
Arc::new(handlers_executor),
heartbeat_interval_millis,
region_alive_keepers,
));
(catalog_manager as CatalogManagerRef, None, heartbeat_task)
} }
}; };
@@ -258,18 +280,27 @@ impl Instance {
&*procedure_manager, &*procedure_manager,
); );
Ok(Self { let instance = Arc::new(Self {
query_engine: query_engine.clone(), query_engine: query_engine.clone(),
sql_handler: SqlHandler::new( sql_handler: SqlHandler::new(
engine_manager, engine_manager.clone(),
catalog_manager.clone(), catalog_manager.clone(),
procedure_manager.clone(), procedure_manager.clone(),
), ),
catalog_manager, catalog_manager: catalog_manager.clone(),
heartbeat_task,
table_id_provider, table_id_provider,
procedure_manager, procedure_manager,
}) });
let heartbeat_task = Instance::build_heartbeat_task(
opts,
meta_client,
catalog_manager,
engine_manager,
region_alive_keepers,
)?;
Ok((instance, heartbeat_task))
} }
pub async fn start(&self) -> Result<()> { pub async fn start(&self) -> Result<()> {
@@ -277,9 +308,6 @@ impl Instance {
.start() .start()
.await .await
.context(NewCatalogSnafu)?; .context(NewCatalogSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
// Recover procedures after the catalog manager is started, so we can // Recover procedures after the catalog manager is started, so we can
// ensure we can access all tables from the catalog manager. // ensure we can access all tables from the catalog manager.
@@ -298,13 +326,6 @@ impl Instance {
.stop() .stop()
.await .await
.context(StopProcedureManagerSnafu)?; .context(StopProcedureManagerSnafu)?;
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
.close()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}
self.flush_tables().await?; self.flush_tables().await?;

View File

@@ -17,7 +17,7 @@
pub mod datanode; pub mod datanode;
pub mod error; pub mod error;
mod heartbeat; pub mod heartbeat;
pub mod instance; pub mod instance;
pub mod metrics; pub mod metrics;
mod mock; mod mock;

View File

@@ -21,15 +21,21 @@ use storage::compaction::noop::NoopCompactionScheduler;
use crate::datanode::DatanodeOptions; use crate::datanode::DatanodeOptions;
use crate::error::Result; use crate::error::Result;
use crate::instance::Instance; use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};
impl Instance { impl Instance {
pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result<Self> { pub async fn with_mock_meta_client(
opts: &DatanodeOptions,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let mock_info = meta_srv::mocks::mock_with_memstore().await; let mock_info = meta_srv::mocks::mock_with_memstore().await;
Self::with_mock_meta_server(opts, mock_info).await Self::with_mock_meta_server(opts, mock_info).await
} }
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> { pub async fn with_mock_meta_server(
opts: &DatanodeOptions,
meta_srv: MockInfo,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
Instance::new(opts, Some(meta_client), compaction_scheduler).await Instance::new(opts, Some(meta_client), compaction_scheduler).await

View File

@@ -29,10 +29,12 @@ use crate::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig,
}; };
use crate::error::{CreateTableSnafu, Result}; use crate::error::{CreateTableSnafu, Result};
use crate::instance::Instance; use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};
pub(crate) struct MockInstance { pub(crate) struct MockInstance {
instance: Instance, instance: InstanceRef,
_heartbeat: Option<HeartbeatTask>,
_guard: TestGuard, _guard: TestGuard,
} }
@@ -40,10 +42,17 @@ impl MockInstance {
pub(crate) async fn new(name: &str) -> Self { pub(crate) async fn new(name: &str) -> Self {
let (opts, _guard) = create_tmp_dir_and_datanode_opts(name); let (opts, _guard) = create_tmp_dir_and_datanode_opts(name);
let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(task) = heartbeat.as_ref() {
task.start().await.unwrap();
}
MockInstance { instance, _guard } MockInstance {
instance,
_guard,
_heartbeat: heartbeat,
}
} }
pub(crate) fn inner(&self) -> &Instance { pub(crate) fn inner(&self) -> &Instance {

View File

@@ -27,6 +27,7 @@ use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder; use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir; use common_test_util::temp_dir::create_temp_dir;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::heartbeat::HeartbeatTask;
use datanode::instance::Instance as DatanodeInstance; use datanode::instance::Instance as DatanodeInstance;
use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder; use meta_client::client::MetaClientBuilder;
@@ -49,6 +50,7 @@ pub struct GreptimeDbCluster {
_wal_guards: Vec<WalGuard>, _wal_guards: Vec<WalGuard>,
pub datanode_instances: HashMap<DatanodeId, Arc<DatanodeInstance>>, pub datanode_instances: HashMap<DatanodeId, Arc<DatanodeInstance>>,
pub datanode_heartbeat_tasks: HashMap<DatanodeId, Option<HeartbeatTask>>,
pub kv_store: KvStoreRef, pub kv_store: KvStoreRef,
pub meta_srv: MetaSrv, pub meta_srv: MetaSrv,
pub frontend: Arc<FeInstance>, pub frontend: Arc<FeInstance>,
@@ -86,7 +88,7 @@ impl GreptimeDbClusterBuilder {
let meta_srv = self.build_metasrv().await; let meta_srv = self.build_metasrv().await;
let (datanode_instances, storage_guards, wal_guards) = let (datanode_instances, heartbeat_tasks, storage_guards, wal_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await; self.build_datanodes(meta_srv.clone(), datanodes).await;
let datanode_clients = build_datanode_clients(&datanode_instances, datanodes).await; let datanode_clients = build_datanode_clients(&datanode_instances, datanodes).await;
@@ -103,6 +105,7 @@ impl GreptimeDbClusterBuilder {
storage_guards, storage_guards,
_wal_guards: wal_guards, _wal_guards: wal_guards,
datanode_instances, datanode_instances,
datanode_heartbeat_tasks: heartbeat_tasks,
kv_store: self.kv_store.clone(), kv_store: self.kv_store.clone(),
meta_srv: meta_srv.meta_srv, meta_srv: meta_srv.meta_srv,
frontend, frontend,
@@ -119,10 +122,12 @@ impl GreptimeDbClusterBuilder {
datanodes: u32, datanodes: u32,
) -> ( ) -> (
HashMap<DatanodeId, Arc<DatanodeInstance>>, HashMap<DatanodeId, Arc<DatanodeInstance>>,
HashMap<DatanodeId, Option<HeartbeatTask>>,
Vec<StorageGuard>, Vec<StorageGuard>,
Vec<WalGuard>, Vec<WalGuard>,
) { ) {
let mut instances = HashMap::with_capacity(datanodes as usize); let mut instances = HashMap::with_capacity(datanodes as usize);
let mut heartbeat_tasks = HashMap::with_capacity(datanodes as usize);
let mut storage_guards = Vec::with_capacity(datanodes as usize); let mut storage_guards = Vec::with_capacity(datanodes as usize);
let mut wal_guards = Vec::with_capacity(datanodes as usize); let mut wal_guards = Vec::with_capacity(datanodes as usize);
@@ -151,9 +156,10 @@ impl GreptimeDbClusterBuilder {
let dn_instance = self.create_datanode(&opts, meta_srv.clone()).await; let dn_instance = self.create_datanode(&opts, meta_srv.clone()).await;
instances.insert(datanode_id, dn_instance.clone()); instances.insert(datanode_id, dn_instance.0.clone());
heartbeat_tasks.insert(datanode_id, dn_instance.1);
} }
(instances, storage_guards, wal_guards) (instances, heartbeat_tasks, storage_guards, wal_guards)
} }
async fn wait_datanodes_alive(&self, expected_datanodes: u32) { async fn wait_datanodes_alive(&self, expected_datanodes: u32) {
@@ -175,14 +181,14 @@ impl GreptimeDbClusterBuilder {
&self, &self,
opts: &DatanodeOptions, opts: &DatanodeOptions,
meta_srv: MockInfo, meta_srv: MockInfo,
) -> Arc<DatanodeInstance> { ) -> (Arc<DatanodeInstance>, Option<HeartbeatTask>) {
let instance = Arc::new( let (instance, heartbeat) = DatanodeInstance::with_mock_meta_server(opts, meta_srv)
DatanodeInstance::with_mock_meta_server(opts, meta_srv) .await
.await .unwrap();
.unwrap(),
);
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat.as_ref() {
heartbeat.start().await.unwrap();
}
// create another catalog and schema for testing // create another catalog and schema for testing
instance instance
.catalog_manager() .catalog_manager()
@@ -192,7 +198,7 @@ impl GreptimeDbClusterBuilder {
.create_catalog_and_schema("another_catalog", "another_schema") .create_catalog_and_schema("another_catalog", "another_schema")
.await .await
.unwrap(); .unwrap();
instance (instance, heartbeat)
} }
async fn build_frontend( async fn build_frontend(

View File

@@ -321,7 +321,7 @@ pub async fn create_test_table(
pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
create_test_table( create_test_table(
instance.catalog_manager(), instance.catalog_manager(),
instance.sql_handler(), instance.sql_handler(),
@@ -334,6 +334,9 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
.await .await
.unwrap(); .unwrap();
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
}
let http_opts = HttpOptions { let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()), addr: format!("127.0.0.1:{}", ports::get_port()),
@@ -354,11 +357,14 @@ pub async fn setup_test_http_app_with_frontend(
name: &str, name: &str,
) -> (Router, TestGuard) { ) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
let frontend = FeInstance::try_new_standalone(instance.clone()) let frontend = FeInstance::try_new_standalone(instance.clone())
.await .await
.unwrap(); .unwrap();
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
}
create_test_table( create_test_table(
frontend.catalog_manager(), frontend.catalog_manager(),
instance.sql_handler(), instance.sql_handler(),
@@ -416,11 +422,14 @@ pub async fn setup_test_prom_app_with_frontend(
) -> (Router, TestGuard) { ) -> (Router, TestGuard) {
std::env::set_var("TZ", "UTC"); std::env::set_var("TZ", "UTC");
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
let frontend = FeInstance::try_new_standalone(instance.clone()) let frontend = FeInstance::try_new_standalone(instance.clone())
.await .await
.unwrap(); .unwrap();
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
}
create_test_table( create_test_table(
frontend.catalog_manager(), frontend.catalog_manager(),
@@ -470,7 +479,7 @@ pub async fn setup_grpc_server(
common_telemetry::init_default_ut_logging(); common_telemetry::init_default_ut_logging();
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
let runtime = Arc::new( let runtime = Arc::new(
RuntimeBuilder::default() RuntimeBuilder::default()
@@ -484,6 +493,9 @@ pub async fn setup_grpc_server(
.await .await
.unwrap(); .unwrap();
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
}
let fe_instance_ref = Arc::new(fe_instance); let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new( let fe_grpc_server = Arc::new(GrpcServer::new(
ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()), ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()),
@@ -522,7 +534,7 @@ pub async fn setup_mysql_server(
common_telemetry::init_default_ut_logging(); common_telemetry::init_default_ut_logging();
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
let runtime = Arc::new( let runtime = Arc::new(
RuntimeBuilder::default() RuntimeBuilder::default()
@@ -538,6 +550,9 @@ pub async fn setup_mysql_server(
.await .await
.unwrap(); .unwrap();
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
}
let fe_instance_ref = Arc::new(fe_instance); let fe_instance_ref = Arc::new(fe_instance);
let opts = MysqlOptions { let opts = MysqlOptions {
addr: fe_mysql_addr.clone(), addr: fe_mysql_addr.clone(),
@@ -575,7 +590,7 @@ pub async fn setup_pg_server(
common_telemetry::init_default_ut_logging(); common_telemetry::init_default_ut_logging();
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
let runtime = Arc::new( let runtime = Arc::new(
RuntimeBuilder::default() RuntimeBuilder::default()
@@ -591,6 +606,9 @@ pub async fn setup_pg_server(
.await .await
.unwrap(); .unwrap();
instance.start().await.unwrap(); instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
}
let fe_instance_ref = Arc::new(fe_instance); let fe_instance_ref = Arc::new(fe_instance);
let opts = PostgresOptions { let opts = PostgresOptions {
addr: fe_pg_addr.clone(), addr: fe_pg_addr.clone(),

View File

@@ -65,7 +65,7 @@ impl MockStandaloneInstance {
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name); let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name);
let dn_instance = Arc::new(DatanodeInstance::with_opts(&opts).await.unwrap()); let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts).await.unwrap();
let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
.await .await
@@ -87,6 +87,9 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
.unwrap(); .unwrap();
dn_instance.start().await.unwrap(); dn_instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
};
MockStandaloneInstance { MockStandaloneInstance {
instance: Arc::new(frontend_instance), instance: Arc::new(frontend_instance),
_guard: guard, _guard: guard,