From 44493e9d8cc38c92bd800b1b5426f786cca697c3 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 14 Mar 2023 18:29:38 +0800 Subject: [PATCH] feat: impl flush on shutdown (#14) * feat: impl flush on shutdown Signed-off-by: Ruihang Xia * powerful if-else! Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/cmd/src/error.rs | 7 +++++ src/cmd/src/standalone.rs | 17 ++++++++++-- src/datanode/src/instance.rs | 42 ++++++++++++++++++++++++++++- src/datanode/src/sql/flush_table.rs | 5 ++++ 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index a4b42c7fda..2e47a56d5a 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -26,6 +26,12 @@ pub enum Error { source: datanode::error::Error, }, + #[snafu(display("Failed to stop datanode, source: {}", source))] + StopDatanode { + #[snafu(backtrace)] + source: BoxedError, + }, + #[snafu(display("Failed to start frontend, source: {}", source))] StartFrontend { #[snafu(backtrace)] @@ -163,6 +169,7 @@ impl ErrorExt for Error { source.status_code() } Error::SubstraitEncodeLogicalPlan { source } => source.status_code(), + Error::StopDatanode { source } => source.status_code(), } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 1d53ca7b0f..0c97b7f467 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use clap::Parser; use common_base::Plugins; +use common_error::prelude::BoxedError; use common_telemetry::info; use datanode::datanode::{ CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, @@ -36,7 +37,9 @@ use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; -use crate::error::{Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu}; +use crate::error::{ + Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu, StopDatanodeSnafu, +}; use crate::frontend::load_frontend_plugins; use crate::toml_loader; @@ -152,7 +155,17 @@ impl Instance { } pub async fn stop(&self) -> Result<()> { - // TODO: handle standalone shutdown + self.datanode + .shutdown() + .await + .map_err(BoxedError::new) + .context(StopDatanodeSnafu)?; + self.frontend + .shutdown() + .await + .map_err(BoxedError::new) + .context(StopDatanodeSnafu)?; + Ok(()) } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index ed2faa2f39..5a2ff5405d 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -37,12 +37,14 @@ use object_store::services::{Fs as FsBuilder, Oss as OSSBuilder, S3 as S3Builder use object_store::{util, ObjectStore, ObjectStoreBuilder}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use servers::Mode; +use session::context::QueryContext; use snafu::prelude::*; use storage::compaction::{CompactionHandler, CompactionSchedulerRef, SimplePicker}; use storage::config::EngineConfig as StorageEngineConfig; use storage::scheduler::{LocalScheduler, SchedulerConfig}; use storage::EngineImpl; use store_api::logstore::LogStore; +use table::requests::FlushTableRequest; use table::table::numbers::NumbersTable; use table::table::TableIdProviderRef; use table::Table; @@ -56,7 +58,7 @@ use crate::error::{ }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; -use crate::sql::SqlHandler; +use crate::sql::{SqlHandler, SqlRequest}; mod grpc; mod script; @@ -233,6 +235,8 @@ impl Instance { .context(ShutdownInstanceSnafu)?; } + self.flush_tables().await?; + self.sql_handler .close() .await @@ -240,6 +244,42 @@ impl Instance { .context(ShutdownInstanceSnafu) } + pub async fn flush_tables(&self) -> Result<()> { + info!("going to flush all schemas"); + let schema_list = self + .catalog_manager + .catalog(DEFAULT_CATALOG_NAME) + .map_err(BoxedError::new) + .context(ShutdownInstanceSnafu)? + .expect("Default schema not found") + .schema_names() + .map_err(BoxedError::new) + .context(ShutdownInstanceSnafu)?; + let flush_requests = schema_list + .into_iter() + .map(|schema_name| { + SqlRequest::FlushTable(FlushTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name, + table_name: None, + region_number: None, + }) + }) + .collect::>(); + let flush_result = futures::future::try_join_all( + flush_requests + .into_iter() + .map(|request| self.sql_handler.execute(request, QueryContext::arc())), + ) + .await + .map_err(BoxedError::new) + .context(ShutdownInstanceSnafu); + info!("flush success: {}", flush_result.is_ok()); + flush_result?; + + Ok(()) + } + pub fn sql_handler(&self) -> &SqlHandler { &self.sql_handler } diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index ab3b459311..57f0bd4483 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_query::Output; use snafu::{OptionExt, ResultExt}; use table::engine::TableReference; @@ -63,6 +64,10 @@ impl SqlHandler { table: &str, region: Option, ) -> Result<()> { + if schema == DEFAULT_SCHEMA_NAME && table == "numbers" { + return Ok(()); + } + let table_ref = TableReference { catalog, schema,