feat: add shutdown for datanode (#1160)

This commit is contained in:
Weny Xu
2023-03-13 17:49:26 +08:00
committed by GitHub
parent 3377930a50
commit 8a83de4ea5
12 changed files with 136 additions and 4 deletions

View File

@@ -221,4 +221,8 @@ impl TableEngine for MockTableEngine {
) -> table::Result<bool> {
unimplemented!()
}
async fn close(&self) -> table::Result<()> {
Ok(())
}
}

View File

@@ -240,6 +240,20 @@ impl Datanode {
pub fn get_instance(&self) -> InstanceRef {
self.instance.clone()
}
async fn shutdown_instance(&self) -> Result<()> {
self.instance.shutdown().await
}
async fn shutdown_services(&self) -> Result<()> {
self.services.shutdown().await
}
pub async fn shutdown(&self) -> Result<()> {
// We must shutdown services first
self.shutdown_services().await?;
self.shutdown_instance().await
}
}
#[cfg(test)]

View File

@@ -482,6 +482,24 @@ pub enum Error {
#[snafu(backtrace)]
source: common_procedure::error::Error,
},
#[snafu(display("Failed to close table engine, source: {}", source))]
CloseTableEngine {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to shutdown server, source: {}", source))]
ShutdownServer {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Failed to shutdown instance, source: {}", source))]
ShutdownInstance {
#[snafu(backtrace)]
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -550,7 +568,10 @@ impl ErrorExt for Error {
| BuildParquetRecordBatchStream { .. }
| InvalidSchema { .. }
| ParseDataTypes { .. }
| IncorrectInternalState { .. } => StatusCode::Internal,
| IncorrectInternalState { .. }
| ShutdownServer { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. } => StatusCode::Internal,
BuildBackend { .. }
| InitBackend { .. }

View File

@@ -144,6 +144,18 @@ impl HeartbeatTask {
Ok(())
}
pub async fn close(&self) -> Result<()> {
let running = self.running.clone();
if running
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Call close heartbeat task multiple times");
}
Ok(())
}
}
/// Resolves hostname:port address for meta registration

View File

@@ -20,6 +20,7 @@ use catalog::remote::MetaKvBackend;
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
@@ -51,7 +52,7 @@ use crate::datanode::{
};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -220,6 +221,22 @@ impl Instance {
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
.close()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}
self.sql_handler
.close()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)
}
pub fn sql_handler(&self) -> &SqlHandler {
&self.sql_handler
}

View File

@@ -30,7 +30,9 @@ use snafu::ResultExt;
use crate::datanode::DatanodeOptions;
use crate::error::Error::StartServer;
use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu};
use crate::error::{
ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownServerSnafu, StartServerSnafu,
};
use crate::instance::InstanceRef;
pub mod grpc;
@@ -115,4 +117,17 @@ impl Services {
.context(StartServerSnafu)?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
let mut res = vec![self.grpc_server.shutdown()];
if let Some(mysql_server) = &self.mysql_server {
res.push(mysql_server.shutdown());
}
futures::future::try_join_all(res)
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
@@ -28,7 +29,9 @@ use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, Tabl
use table::requests::*;
use table::TableRef;
use crate::error::{self, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu};
use crate::error::{
self, CloseTableEngineSnafu, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::sql::table_idents_to_full_name;
mod alter;
@@ -139,6 +142,14 @@ impl SqlHandler {
pub fn table_engine(&self) -> TableEngineRef {
self.table_engine.clone()
}
pub async fn close(&self) -> Result<()> {
self.table_engine
.close()
.await
.map_err(BoxedError::new)
.context(CloseTableEngineSnafu)
}
}
#[cfg(test)]

View File

@@ -165,6 +165,10 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn close(&self) -> TableResult<()> {
self.inner.close().await
}
}
impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
@@ -623,6 +627,19 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.remove(&table_reference.to_string())
.is_some())
}
async fn close(&self) -> TableResult<()> {
let _lock = self.table_mutex.lock().await;
let tables = self.tables.write().unwrap().clone();
futures::future::try_join_all(tables.values().map(|t| t.close()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
}
impl<S: StorageEngine> MitoEngineInner<S> {

View File

@@ -322,6 +322,15 @@ impl<R: Region> Table for MitoTable<R> {
}
Ok(rows_deleted)
}
async fn close(&self) -> TableResult<()> {
futures::future::try_join_all(self.regions.values().map(|region| region.close()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
}
struct ChunkStream {

View File

@@ -100,6 +100,9 @@ pub trait TableEngine: Send + Sync {
/// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist.
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<bool>;
/// Close the table.
async fn close(&self) -> Result<()>;
}
pub type TableEngineRef = Arc<dyn TableEngine>;

View File

@@ -93,6 +93,11 @@ pub trait Table: Send + Sync {
}
.fail()?
}
/// Close the table.
async fn close(&self) -> Result<()> {
Ok(())
}
}
pub type TableRef = Arc<dyn Table>;

View File

@@ -96,4 +96,8 @@ impl TableEngine for MockTableEngine {
async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result<bool> {
unimplemented!()
}
async fn close(&self) -> Result<()> {
Ok(())
}
}