From 8a83de4ea52e1bac63e213a239e411865585f679 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 13 Mar 2023 17:49:26 +0800 Subject: [PATCH] feat: add shutdown for datanode (#1160) --- src/catalog/tests/mock.rs | 4 ++++ src/datanode/src/datanode.rs | 14 ++++++++++++++ src/datanode/src/error.rs | 23 ++++++++++++++++++++++- src/datanode/src/heartbeat.rs | 12 ++++++++++++ src/datanode/src/instance.rs | 19 ++++++++++++++++++- src/datanode/src/server.rs | 17 ++++++++++++++++- src/datanode/src/sql.rs | 13 ++++++++++++- src/mito/src/engine.rs | 17 +++++++++++++++++ src/mito/src/table.rs | 9 +++++++++ src/table/src/engine.rs | 3 +++ src/table/src/table.rs | 5 +++++ src/table/src/test_util/mock_engine.rs | 4 ++++ 12 files changed, 136 insertions(+), 4 deletions(-) diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs index 6cc06fbd77..d34a284b89 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/tests/mock.rs @@ -221,4 +221,8 @@ impl TableEngine for MockTableEngine { ) -> table::Result { unimplemented!() } + + async fn close(&self) -> table::Result<()> { + Ok(()) + } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 19d5d8a735..729a682d4e 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -240,6 +240,20 @@ impl Datanode { pub fn get_instance(&self) -> InstanceRef { self.instance.clone() } + + async fn shutdown_instance(&self) -> Result<()> { + self.instance.shutdown().await + } + + async fn shutdown_services(&self) -> Result<()> { + self.services.shutdown().await + } + + pub async fn shutdown(&self) -> Result<()> { + // We must shutdown services first + self.shutdown_services().await?; + self.shutdown_instance().await + } } #[cfg(test)] diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 2eeabb12e6..6f6ab2b24a 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -482,6 +482,24 @@ pub enum Error { #[snafu(backtrace)] source: common_procedure::error::Error, }, + + #[snafu(display("Failed to close table engine, source: {}", source))] + CloseTableEngine { + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Failed to shutdown server, source: {}", source))] + ShutdownServer { + #[snafu(backtrace)] + source: servers::error::Error, + }, + + #[snafu(display("Failed to shutdown instance, source: {}", source))] + ShutdownInstance { + #[snafu(backtrace)] + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -550,7 +568,10 @@ impl ErrorExt for Error { | BuildParquetRecordBatchStream { .. } | InvalidSchema { .. } | ParseDataTypes { .. } - | IncorrectInternalState { .. } => StatusCode::Internal, + | IncorrectInternalState { .. } + | ShutdownServer { .. } + | ShutdownInstance { .. } + | CloseTableEngine { .. } => StatusCode::Internal, BuildBackend { .. } | InitBackend { .. } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index e2650ae049..842cbe6eed 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -144,6 +144,18 @@ impl HeartbeatTask { Ok(()) } + + pub async fn close(&self) -> Result<()> { + let running = self.running.clone(); + if running + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + warn!("Call close heartbeat task multiple times"); + } + + Ok(()) + } } /// Resolves hostname:port address for meta registration diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 53beff1ead..66d1db095c 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -20,6 +20,7 @@ use catalog::remote::MetaKvBackend; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::readable_size::ReadableSize; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; @@ -51,7 +52,7 @@ use crate::datanode::{ }; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, + NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -220,6 +221,22 @@ impl Instance { Ok(()) } + pub async fn shutdown(&self) -> Result<()> { + if let Some(heartbeat_task) = &self.heartbeat_task { + heartbeat_task + .close() + .await + .map_err(BoxedError::new) + .context(ShutdownInstanceSnafu)?; + } + + self.sql_handler + .close() + .await + .map_err(BoxedError::new) + .context(ShutdownInstanceSnafu) + } + pub fn sql_handler(&self) -> &SqlHandler { &self.sql_handler } diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 482e4f1a10..b62e20c17d 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -30,7 +30,9 @@ use snafu::ResultExt; use crate::datanode::DatanodeOptions; use crate::error::Error::StartServer; -use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu}; +use crate::error::{ + ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownServerSnafu, StartServerSnafu, +}; use crate::instance::InstanceRef; pub mod grpc; @@ -115,4 +117,17 @@ impl Services { .context(StartServerSnafu)?; Ok(()) } + + pub async fn shutdown(&self) -> Result<()> { + let mut res = vec![self.grpc_server.shutdown()]; + if let Some(mysql_server) = &self.mysql_server { + res.push(mysql_server.shutdown()); + } + + futures::future::try_join_all(res) + .await + .context(ShutdownServerSnafu)?; + + Ok(()) + } } diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index aa7632ab8b..e3e10f5c26 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -13,6 +13,7 @@ // limitations under the License. use catalog::CatalogManagerRef; +use common_error::prelude::BoxedError; use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::error; @@ -28,7 +29,9 @@ use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, Tabl use table::requests::*; use table::TableRef; -use crate::error::{self, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu}; +use crate::error::{ + self, CloseTableEngineSnafu, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu, +}; use crate::instance::sql::table_idents_to_full_name; mod alter; @@ -139,6 +142,14 @@ impl SqlHandler { pub fn table_engine(&self) -> TableEngineRef { self.table_engine.clone() } + + pub async fn close(&self) -> Result<()> { + self.table_engine + .close() + .await + .map_err(BoxedError::new) + .context(CloseTableEngineSnafu) + } } #[cfg(test)] diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 9ef5154f2e..ed77a72763 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -165,6 +165,10 @@ impl TableEngine for MitoEngine { .map_err(BoxedError::new) .context(table_error::TableOperationSnafu) } + + async fn close(&self) -> TableResult<()> { + self.inner.close().await + } } impl TableEngineProcedure for MitoEngine { @@ -623,6 +627,19 @@ impl MitoEngineInner { .remove(&table_reference.to_string()) .is_some()) } + + async fn close(&self) -> TableResult<()> { + let _lock = self.table_mutex.lock().await; + + let tables = self.tables.write().unwrap().clone(); + + futures::future::try_join_all(tables.values().map(|t| t.close())) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + Ok(()) + } } impl MitoEngineInner { diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 1b176bf274..92251b8b2c 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -322,6 +322,15 @@ impl Table for MitoTable { } Ok(rows_deleted) } + + async fn close(&self) -> TableResult<()> { + futures::future::try_join_all(self.regions.values().map(|region| region.close())) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + Ok(()) + } } struct ChunkStream { diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 098789e475..2f46e16eec 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -100,6 +100,9 @@ pub trait TableEngine: Send + Sync { /// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist. async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result; + + /// Close the table. + async fn close(&self) -> Result<()>; } pub type TableEngineRef = Arc; diff --git a/src/table/src/table.rs b/src/table/src/table.rs index f440d6b950..b59841b8ed 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -93,6 +93,11 @@ pub trait Table: Send + Sync { } .fail()? } + + /// Close the table. + async fn close(&self) -> Result<()> { + Ok(()) + } } pub type TableRef = Arc; diff --git a/src/table/src/test_util/mock_engine.rs b/src/table/src/test_util/mock_engine.rs index ddd5b32b6d..ce30dddee2 100644 --- a/src/table/src/test_util/mock_engine.rs +++ b/src/table/src/test_util/mock_engine.rs @@ -96,4 +96,8 @@ impl TableEngine for MockTableEngine { async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result { unimplemented!() } + + async fn close(&self) -> Result<()> { + Ok(()) + } }