diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 8b048a3f3c..44d0da8b79 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use common_error::prelude::*; @@ -81,6 +82,21 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))] + ProcedurePanic { procedure_id: ProcedureId }, + + #[snafu(display("Failed to wait watcher, source: {}", source))] + WaitWatcher { + source: tokio::sync::watch::error::RecvError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to execute procedure, source: {}", source))] + ProcedureExec { + source: Arc, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -95,10 +111,13 @@ impl ErrorExt for Error { | Error::ListState { .. } | Error::ReadState { .. } | Error::FromJson { .. } - | Error::RetryLater { .. } => StatusCode::Internal, + | Error::RetryLater { .. } + | Error::WaitWatcher { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } + Error::ProcedurePanic { .. } => StatusCode::Unexpected, + Error::ProcedureExec { source, .. } => source.status_code(), } } diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 967c7fefe7..50134e2a69 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -18,9 +18,11 @@ pub mod error; pub mod local; mod procedure; mod store; +pub mod watcher; pub use crate::error::{Error, Result}; pub use crate::procedure::{ BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager, - ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, Watcher, + ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, }; +pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 94686a01aa..aa74032303 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -334,7 +334,7 @@ impl LocalManager { common_runtime::spawn_bg(async move { // Run the root procedure. - let _ = runner.run().await; + runner.run().await; }); Ok(watcher) @@ -447,9 +447,9 @@ mod tests { assert!(ctx.try_insert_procedure(meta.clone())); assert!(ctx.contains_procedure(meta.id)); - assert_eq!(ProcedureState::Running, ctx.state(meta.id).unwrap()); + assert!(ctx.state(meta.id).unwrap().is_running()); meta.set_state(ProcedureState::Done); - assert_eq!(ProcedureState::Done, ctx.state(meta.id).unwrap()); + assert!(ctx.state(meta.id).unwrap().is_done()); } #[test] @@ -634,7 +634,7 @@ mod tests { // Wait for the procedure done. let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); watcher.changed().await.unwrap(); - assert_eq!(ProcedureState::Done, *watcher.borrow()); + assert!(watcher.borrow().is_done()); // Try to submit procedure with same id again. let err = manager @@ -697,7 +697,7 @@ mod tests { .unwrap(); // Wait for the notification. watcher.changed().await.unwrap(); - assert_eq!(ProcedureState::Failed, *watcher.borrow()); + assert!(watcher.borrow().is_failed()); } }; diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 233b2e665a..46d2effa13 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -18,7 +18,7 @@ use std::time::Duration; use common_telemetry::logging; use tokio::time; -use crate::error::{Error, Result}; +use crate::error::{ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::store::ProcedureStore; use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status}; @@ -30,7 +30,7 @@ enum ExecResult { Continue, Done, RetryLater, - Failed(Error), + Failed, } #[cfg(test)] @@ -48,7 +48,7 @@ impl ExecResult { } fn is_failed(&self) -> bool { - matches!(self, ExecResult::Failed(_)) + matches!(self, ExecResult::Failed) } } @@ -83,7 +83,11 @@ impl Drop for ProcedureGuard { // Set state to failed. This is useful in test as runtime may not abort when the runner task panics. // See https://github.com/tokio-rs/tokio/issues/2002 . // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook. - self.meta.set_state(ProcedureState::Failed); + let err = ProcedurePanicSnafu { + procedure_id: self.meta.id, + } + .build(); + self.meta.set_state(ProcedureState::failed(Arc::new(err))); } // Notify parent procedure. @@ -109,7 +113,7 @@ pub(crate) struct Runner { impl Runner { /// Run the procedure. - pub(crate) async fn run(mut self) -> Result<()> { + pub(crate) async fn run(mut self) { // Ensure we can update the procedure state. let guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone()); @@ -129,12 +133,9 @@ impl Runner { .await; } - let mut result = Ok(()); // Execute the procedure. We need to release the lock whenever the the execution // is successful or fail. - if let Err(e) = self.execute_procedure_in_loop().await { - result = Err(e); - } + self.execute_procedure_in_loop().await; // We can't remove the metadata of the procedure now as users and its parent might // need to query its state. @@ -155,11 +156,9 @@ impl Runner { self.procedure.type_name(), self.meta.id ); - - result } - async fn execute_procedure_in_loop(&mut self) -> Result<()> { + async fn execute_procedure_in_loop(&mut self) { let ctx = Context { procedure_id: self.meta.id, provider: self.manager_ctx.clone(), @@ -168,11 +167,10 @@ impl Runner { loop { match self.execute_once(&ctx).await { ExecResult::Continue => (), - ExecResult::Done => return Ok(()), + ExecResult::Done | ExecResult::Failed => return, ExecResult::RetryLater => { self.wait_on_err().await; } - ExecResult::Failed(e) => return Err(e), } } } @@ -222,14 +220,14 @@ impl Runner { return ExecResult::RetryLater; } - self.meta.set_state(ProcedureState::Failed); + self.meta.set_state(ProcedureState::failed(Arc::new(e))); // Write rollback key so we can skip this procedure while recovering procedures. if self.rollback_procedure().await.is_err() { return ExecResult::RetryLater; } - ExecResult::Failed(e) + ExecResult::Failed } } } @@ -404,7 +402,7 @@ mod tests { use super::*; use crate::local::test_util; - use crate::{ContextProvider, LockKey, Procedure}; + use crate::{ContextProvider, Error, LockKey, Procedure}; const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1"; @@ -630,9 +628,14 @@ mod tests { // Wait for subprocedures. let mut all_child_done = true; for id in children_ids { - if ctx.provider.procedure_state(id).await.unwrap() - != Some(ProcedureState::Done) - { + let is_not_done = ctx + .provider + .procedure_state(id) + .await + .unwrap() + .map(|s| !s.is_done()) + .unwrap_or(true); + if is_not_done { all_child_done = false; } } @@ -668,7 +671,7 @@ mod tests { // Replace the manager ctx. runner.manager_ctx = manager_ctx; - runner.run().await.unwrap(); + runner.run().await; // Check files on store. for child_id in children_ids { @@ -706,7 +709,7 @@ mod tests { let res = runner.execute_once(&ctx).await; assert!(res.is_failed(), "{res:?}"); - assert_eq!(ProcedureState::Failed, meta.state()); + assert!(meta.state().is_failed()); check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await; } @@ -741,11 +744,11 @@ mod tests { let res = runner.execute_once(&ctx).await; assert!(res.is_retry_later(), "{res:?}"); - assert_eq!(ProcedureState::Running, meta.state()); + assert!(meta.state().is_running()); let res = runner.execute_once(&ctx).await; assert!(res.is_done(), "{res:?}"); - assert_eq!(ProcedureState::Done, meta.state()); + assert!(meta.state().is_done()); check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await; } @@ -779,7 +782,8 @@ mod tests { } else { // Wait for subprocedures. let state = ctx.provider.procedure_state(child_id).await.unwrap(); - if state == Some(ProcedureState::Failed) { + let is_failed = state.map(|s| s.is_failed()).unwrap_or(false); + if is_failed { // The parent procedure to abort itself if child procedure is failed. Err(Error::from_error_ext(PlainError::new( "subprocedure failed".to_string(), @@ -811,12 +815,13 @@ mod tests { let manager_ctx = Arc::new(ManagerContext::new()); // Manually add this procedure to the manager ctx. - assert!(manager_ctx.try_insert_procedure(meta)); + assert!(manager_ctx.try_insert_procedure(meta.clone())); // Replace the manager ctx. runner.manager_ctx = manager_ctx; // Run the runer and execute the procedure. - let err = runner.run().await.unwrap_err(); - assert!(err.to_string().contains("subprocedure failed"), "{err}"); + runner.run().await; + let err = meta.state().error().unwrap().to_string(); + assert!(err.contains("subprocedure failed"), "{err}"); } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index e5b8dc8491..dce404eda6 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -20,10 +20,10 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use snafu::{ResultExt, Snafu}; -use tokio::sync::watch::Receiver; use uuid::Uuid; -use crate::error::Result; +use crate::error::{Error, Result}; +use crate::watcher::Watcher; /// Procedure execution status. #[derive(Debug)] @@ -198,20 +198,47 @@ impl FromStr for ProcedureId { /// Loader to recover the [Procedure] instance from serialized data. pub type BoxedProcedureLoader = Box Result + Send>; -// TODO(yingwen): Find a way to return the error message if the procedure is failed. /// State of a submitted procedure. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone)] pub enum ProcedureState { /// The procedure is running. + #[default] Running, /// The procedure is finished. Done, /// The procedure is failed and cannot proceed anymore. - Failed, + Failed { error: Arc }, } -/// Watcher to watch procedure state. -pub type Watcher = Receiver; +impl ProcedureState { + /// Returns a [ProcedureState] with failed state. + pub fn failed(error: Arc) -> ProcedureState { + ProcedureState::Failed { error } + } + + /// Returns true if the procedure state is running. + pub fn is_running(&self) -> bool { + matches!(self, ProcedureState::Running) + } + + /// Returns true if the procedure state is done. + pub fn is_done(&self) -> bool { + matches!(self, ProcedureState::Done) + } + + /// Returns true if the procedure state failed. + pub fn is_failed(&self) -> bool { + matches!(self, ProcedureState::Failed { .. }) + } + + /// Returns the error. + pub fn error(&self) -> Option<&Arc> { + match self { + ProcedureState::Failed { error } => Some(error), + _ => None, + } + } +} // TODO(yingwen): Shutdown /// `ProcedureManager` executes [Procedure] submitted to it. @@ -244,6 +271,9 @@ pub type ProcedureManagerRef = Arc; #[cfg(test)] mod tests { + use common_error::mock::MockError; + use common_error::prelude::StatusCode; + use super::*; #[test] @@ -311,4 +341,17 @@ mod tests { let parsed = serde_json::from_str(&json).unwrap(); assert_eq!(id, parsed); } + + #[test] + fn test_procedure_state() { + assert!(ProcedureState::Running.is_running()); + assert!(ProcedureState::Running.error().is_none()); + assert!(ProcedureState::Done.is_done()); + + let state = ProcedureState::failed(Arc::new(Error::external(MockError::new( + StatusCode::Unexpected, + )))); + assert!(state.is_failed()); + assert!(state.error().is_some()); + } } diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs new file mode 100644 index 0000000000..ea1b763268 --- /dev/null +++ b/src/common/procedure/src/watcher.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; +use tokio::sync::watch::Receiver; + +use crate::error::{ProcedureExecSnafu, Result, WaitWatcherSnafu}; +use crate::procedure::ProcedureState; + +/// Watcher to watch procedure state. +pub type Watcher = Receiver; + +/// Wait the [Watcher] until the [ProcedureState] is done. +pub async fn wait(watcher: &mut Watcher) -> Result<()> { + loop { + watcher.changed().await.context(WaitWatcherSnafu)?; + match &*watcher.borrow() { + ProcedureState::Running => (), + ProcedureState::Done => { + return Ok(()); + } + ProcedureState::Failed { error } => { + return Err(error.clone()).context(ProcedureExecSnafu); + } + } + } +} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index b7a7bc1848..c7e583c46d 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -408,23 +408,18 @@ pub enum Error { source: common_procedure::error::Error, }, - #[snafu(display("Failed to submit procedure, source: {}", source))] + #[snafu(display("Failed to submit procedure {}, source: {}", procedure_id, source))] SubmitProcedure { + procedure_id: ProcedureId, #[snafu(backtrace)] source: common_procedure::error::Error, }, - #[snafu(display("Failed to wait procedure done, source: {}", source))] + #[snafu(display("Failed to wait procedure {} done, source: {}", procedure_id, source))] WaitProcedure { - source: tokio::sync::watch::error::RecvError, - backtrace: Backtrace, - }, - - // TODO(yingwen): Use procedure's error. - #[snafu(display("Failed to execute procedure, procedure_id: {}", procedure_id))] - ProcedureExec { procedure_id: ProcedureId, - backtrace: Backtrace, + #[snafu(backtrace)] + source: common_procedure::error::Error, }, } @@ -507,7 +502,7 @@ impl ErrorExt for Error { RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => { source.status_code() } - WaitProcedure { .. } | ProcedureExec { .. } => StatusCode::Internal, + WaitProcedure { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 5c12c44aac..53beff1ead 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -177,7 +177,6 @@ impl Instance { }; let procedure_manager = create_procedure_manager(&opts.procedure).await?; - // Recover procedures. if let Some(procedure_manager) = &procedure_manager { table_engine.register_procedure_loaders(&**procedure_manager); table_procedure::register_procedure_loaders( @@ -187,6 +186,7 @@ impl Instance { &**procedure_manager, ); + // Recover procedures. procedure_manager .recover() .await @@ -423,7 +423,7 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result, ) -> Result> { let Some(procedure_config) = procedure_config else { diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index c083514bca..0a46d2dbcc 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -31,9 +31,11 @@ use table::metadata::TableId; use table::table::TableIdProvider; use crate::datanode::DatanodeOptions; -use crate::error::{CatalogSnafu, Result}; +use crate::error::{CatalogSnafu, RecoverProcedureSnafu, Result}; use crate::heartbeat::HeartbeatTask; -use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance}; +use crate::instance::{ + create_log_store, create_procedure_manager, new_object_store, DefaultEngine, Instance, +}; use crate::script::ScriptExecutor; use crate::sql::SqlHandler; @@ -93,6 +95,16 @@ impl Instance { let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; + let procedure_manager = create_procedure_manager(&opts.procedure).await?; + if let Some(procedure_manager) = &procedure_manager { + table_engine.register_procedure_loaders(&**procedure_manager); + // Recover procedures. + procedure_manager + .recover() + .await + .context(RecoverProcedureSnafu)?; + } + Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new( @@ -100,7 +112,7 @@ impl Instance { catalog_manager.clone(), query_engine.clone(), table_engine, - None, + procedure_manager, ), catalog_manager, script_executor, diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 2657f45bdb..5ac6c22b7c 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use catalog::{RegisterSchemaRequest, RegisterTableRequest}; -use common_procedure::{ProcedureManagerRef, ProcedureState, ProcedureWithId}; +use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; use common_query::Output; use common_telemetry::tracing::{error, info}; use datatypes::schema::RawSchema; @@ -33,8 +33,8 @@ use table_procedure::CreateTableProcedure; use crate::error::{ self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, - ProcedureExecSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, - SubmitProcedureSnafu, UnrecognizedTableOptionSnafu, WaitProcedureSnafu, + RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SubmitProcedureSnafu, + UnrecognizedTableOptionSnafu, WaitProcedureSnafu, }; use crate::sql::SqlHandler; @@ -152,21 +152,13 @@ impl SqlHandler { let mut watcher = procedure_manager .submit(procedure_with_id) .await - .context(SubmitProcedureSnafu)?; + .context(SubmitProcedureSnafu { procedure_id })?; - // TODO(yingwen): Wrap this into a function and add error to ProcedureState::Failed. - loop { - watcher.changed().await.context(WaitProcedureSnafu)?; - match *watcher.borrow() { - ProcedureState::Running => (), - ProcedureState::Done => { - return Ok(Output::AffectedRows(0)); - } - ProcedureState::Failed => { - return ProcedureExecSnafu { procedure_id }.fail(); - } - } - } + watcher::wait(&mut watcher) + .await + .context(WaitProcedureSnafu { procedure_id })?; + + Ok(Output::AffectedRows(0)) } /// Converts [CreateTable] to [SqlRequest::CreateTable]. diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 7c6c2f3ac9..5c1f48d3b9 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -795,6 +795,42 @@ async fn test_execute_copy_to() { assert!(matches!(output, Output::AffectedRows(2))); } +#[tokio::test(flavor = "multi_thread")] +async fn test_create_by_procedure() { + common_telemetry::init_default_ut_logging(); + + let instance = MockInstance::with_procedure_enabled("create_by_procedure").await; + + let output = execute_sql( + &instance, + r#"create table test_table( + host string, + ts timestamp, + cpu double default 0, + memory double, + TIME INDEX (ts), + PRIMARY KEY(host) + ) engine=mito with(regions=1);"#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); + + // Create if not exists + let output = execute_sql( + &instance, + r#"create table if not exists test_table( + host string, + ts timestamp, + cpu double default 0, + memory double, + TIME INDEX (ts), + PRIMARY KEY(host) + ) engine=mito with(regions=1);"#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); +} + async fn execute_sql(instance: &MockInstance, sql: &str) -> Output { execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 5b1c700040..d544ae682c 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -28,7 +28,7 @@ use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; use tempdir::TempDir; -use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig}; +use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, WalConfig}; use crate::error::{CreateTableSnafu, Result}; use crate::instance::Instance; use crate::sql::SqlHandler; @@ -36,6 +36,7 @@ use crate::sql::SqlHandler; pub(crate) struct MockInstance { instance: Instance, _guard: TestGuard, + _procedure_dir: Option, } impl MockInstance { @@ -45,7 +46,30 @@ impl MockInstance { let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); - MockInstance { instance, _guard } + MockInstance { + instance, + _guard, + _procedure_dir: None, + } + } + + pub(crate) async fn with_procedure_enabled(name: &str) -> Self { + let (mut opts, _guard) = create_tmp_dir_and_datanode_opts(name); + let procedure_dir = TempDir::new(&format!("gt_procedure_{name}")).unwrap(); + opts.procedure = Some(ProcedureConfig { + store: ObjectStoreConfig::File(FileConfig { + data_dir: procedure_dir.path().to_str().unwrap().to_string(), + }), + }); + + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + instance.start().await.unwrap(); + + MockInstance { + instance, + _guard, + _procedure_dir: Some(procedure_dir), + } } pub(crate) fn inner(&self) -> &Instance { diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index d7ebf9010e..357337ed3f 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -211,7 +211,7 @@ impl CreateTableProcedure { self.data.state = CreateTableState::RegisterCatalog; Ok(Status::executing(true)) } - ProcedureState::Failed => { + ProcedureState::Failed { .. } => { // Return error if the subprocedure is failed. SubprocedureFailedSnafu { subprocedure_id: sub_id,