feat: impl flush on shutdown (#14)

* feat: impl flush on shutdown

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* powerful if-else!

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-03-14 18:29:38 +08:00
committed by GitHub
parent efd15839d4
commit 44493e9d8c
4 changed files with 68 additions and 3 deletions

View File

@@ -26,6 +26,12 @@ pub enum Error {
source: datanode::error::Error,
},
#[snafu(display("Failed to stop datanode, source: {}", source))]
StopDatanode {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to start frontend, source: {}", source))]
StartFrontend {
#[snafu(backtrace)]
@@ -163,6 +169,7 @@ impl ErrorExt for Error {
source.status_code()
}
Error::SubstraitEncodeLogicalPlan { source } => source.status_code(),
Error::StopDatanode { source } => source.status_code(),
}
}

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use clap::Parser;
use common_base::Plugins;
use common_error::prelude::BoxedError;
use common_telemetry::info;
use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
@@ -36,7 +37,9 @@ use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use crate::error::{Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu};
use crate::error::{
Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu, StopDatanodeSnafu,
};
use crate::frontend::load_frontend_plugins;
use crate::toml_loader;
@@ -152,7 +155,17 @@ impl Instance {
}
pub async fn stop(&self) -> Result<()> {
// TODO: handle standalone shutdown
self.datanode
.shutdown()
.await
.map_err(BoxedError::new)
.context(StopDatanodeSnafu)?;
self.frontend
.shutdown()
.await
.map_err(BoxedError::new)
.context(StopDatanodeSnafu)?;
Ok(())
}
}

View File

@@ -37,12 +37,14 @@ use object_store::services::{Fs as FsBuilder, Oss as OSSBuilder, S3 as S3Builder
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use session::context::QueryContext;
use snafu::prelude::*;
use storage::compaction::{CompactionHandler, CompactionSchedulerRef, SimplePicker};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::requests::FlushTableRequest;
use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use table::Table;
@@ -56,7 +58,7 @@ use crate::error::{
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;
use crate::sql::{SqlHandler, SqlRequest};
mod grpc;
mod script;
@@ -233,6 +235,8 @@ impl Instance {
.context(ShutdownInstanceSnafu)?;
}
self.flush_tables().await?;
self.sql_handler
.close()
.await
@@ -240,6 +244,42 @@ impl Instance {
.context(ShutdownInstanceSnafu)
}
pub async fn flush_tables(&self) -> Result<()> {
info!("going to flush all schemas");
let schema_list = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?
.expect("Default schema not found")
.schema_names()
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
let flush_requests = schema_list
.into_iter()
.map(|schema_name| {
SqlRequest::FlushTable(FlushTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name,
table_name: None,
region_number: None,
})
})
.collect::<Vec<_>>();
let flush_result = futures::future::try_join_all(
flush_requests
.into_iter()
.map(|request| self.sql_handler.execute(request, QueryContext::arc())),
)
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu);
info!("flush success: {}", flush_result.is_ok());
flush_result?;
Ok(())
}
pub fn sql_handler(&self) -> &SqlHandler {
&self.sql_handler
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::Output;
use snafu::{OptionExt, ResultExt};
use table::engine::TableReference;
@@ -63,6 +64,10 @@ impl SqlHandler {
table: &str,
region: Option<u32>,
) -> Result<()> {
if schema == DEFAULT_SCHEMA_NAME && table == "numbers" {
return Ok(());
}
let table_ref = TableReference {
catalog,
schema,