diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 3a110ea523..7ba897877d 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -333,7 +333,7 @@ impl AlterTableProcedure { .await?; }; - Ok(Status::Done) + Ok(Status::done()) } fn lock_key_inner(&self) -> Vec { diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 110ccbeb65..1fe354d63d 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -123,7 +123,7 @@ impl CreateTableProcedure { } ); - return Ok(Status::Done); + return Ok(Status::done()); } self.creator.data.state = CreateTableState::DatanodeCreateRegions; @@ -313,7 +313,7 @@ impl CreateTableProcedure { .await?; info!("Created table metadata for table {table_id}"); - Ok(Status::Done) + Ok(Status::done()) } } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index b1045abdea..ceb47193d7 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -100,7 +100,7 @@ impl DropTableProcedure { .await?; if !exist && self.data.task.drop_if_exists { - return Ok(Status::Done); + return Ok(Status::done()); } ensure!( @@ -236,7 +236,7 @@ impl DropTableProcedure { .into_iter() .collect::>>()?; - Ok(Status::Done) + Ok(Status::done()) } } diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 973c151965..609feef26f 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -182,7 +182,7 @@ impl TruncateTableProcedure { .into_iter() .collect::>>()?; - Ok(Status::Done) + Ok(Status::done()) } } diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index 938a2ad91b..fb759b16a2 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -60,7 +60,7 @@ pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) { subprocedures.is_empty(), "Executing subprocedure is unsupported" ), - Status::Done => break, + Status::Done { .. } => break, } } } @@ -87,7 +87,7 @@ pub async fn execute_procedure_once( ); false } - Status::Done => true, + Status::Done { .. } => true, } } @@ -108,7 +108,7 @@ pub async fn execute_until_suspended_or_done( match procedure.execute(&ctx).await.unwrap() { Status::Executing { .. } => (), Status::Suspended { subprocedures, .. } => return Some(subprocedures), - Status::Done => break, + Status::Done { .. } => break, } } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 624e98d181..c68005db59 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -662,7 +662,7 @@ mod tests { assert!(ctx.contains_procedure(meta.id)); assert!(ctx.state(meta.id).unwrap().is_running()); - meta.set_state(ProcedureState::Done); + meta.set_state(ProcedureState::Done { output: None }); assert!(ctx.state(meta.id).unwrap().is_done()); } @@ -723,7 +723,7 @@ mod tests { } async fn execute(&mut self, _ctx: &Context) -> Result { - Ok(Status::Done) + Ok(Status::done()) } fn dump(&self) -> Result { diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 87f2e2f635..ef4e018de2 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -22,7 +22,7 @@ use tokio::time; use super::rwlock::OwnedKeyRwLockGuard; use crate::error::{self, ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; -use crate::procedure::StringKey; +use crate::procedure::{Output, StringKey}; use crate::store::ProcedureStore; use crate::ProcedureState::Retrying; use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status}; @@ -288,13 +288,13 @@ impl Runner { Status::Suspended { subprocedures, .. } => { self.on_suspended(subprocedures).await; } - Status::Done => { + Status::Done { output } => { if let Err(e) = self.commit_procedure().await { self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return ExecResult::RetryLater; } - self.done(); + self.done(output); return ExecResult::Done; } } @@ -481,7 +481,7 @@ impl Runner { Ok(()) } - fn done(&self) { + fn done(&self, output: Option) { // TODO(yingwen): Add files to remove list. logging::info!( "Procedure {}-{} done", @@ -490,7 +490,7 @@ impl Runner { ); // Mark the state of this procedure to done. - self.meta.set_state(ProcedureState::Done); + self.meta.set_state(ProcedureState::Done { output }); } } @@ -610,7 +610,7 @@ mod tests { if times == 1 { Ok(Status::Executing { persist }) } else { - Ok(Status::Done) + Ok(Status::done()) } } .boxed() @@ -703,7 +703,7 @@ mod tests { time::sleep(Duration::from_millis(200)).await; Ok(Status::Executing { persist: true }) } else { - Ok(Status::Done) + Ok(Status::done()) } } .boxed() @@ -764,7 +764,7 @@ mod tests { } } if all_child_done { - Ok(Status::Done) + Ok(Status::done()) } else { // Return suspended to wait for notify. Ok(Status::Suspended { @@ -923,7 +923,7 @@ mod tests { if times == 1 { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) } else { - Ok(Status::Done) + Ok(Status::done()) } } .boxed() diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 2df005bdf0..a60d935c3e 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::fmt; use std::str::FromStr; use std::sync::Arc; @@ -25,6 +26,8 @@ use uuid::Uuid; use crate::error::{Error, Result}; use crate::watcher::Watcher; +pub type Output = Arc; + /// Procedure execution status. #[derive(Debug)] pub enum Status { @@ -40,7 +43,7 @@ pub enum Status { persist: bool, }, /// the procedure is done. - Done, + Done { output: Option }, } impl Status { @@ -49,13 +52,29 @@ impl Status { Status::Executing { persist } } + /// Returns a [Status::Done] without output. + pub fn done() -> Status { + Status::Done { output: None } + } + + /// Returns a [Status::Done] with output. + pub fn done_with_output(output: Output) -> Status { + Status::Done { + output: Some(output), + } + } + /// Returns `true` if the procedure is done. + pub fn is_done(&self) -> bool { + matches!(self, Status::Done { .. }) + } + /// Returns `true` if the procedure needs the framework to persist its intermediate state. pub fn need_persist(&self) -> bool { // If the procedure is done, the framework doesn't need to persist the procedure // anymore. It only needs to mark the procedure as committed. match self { Status::Executing { persist } | Status::Suspended { persist, .. } => *persist, - Status::Done => false, + Status::Done { .. } => false, } } } @@ -251,7 +270,7 @@ pub enum ProcedureState { #[default] Running, /// The procedure is finished. - Done, + Done { output: Option }, /// The procedure is failed and can be retried. Retrying { error: Arc }, /// The procedure is failed and cannot proceed anymore. @@ -276,7 +295,7 @@ impl ProcedureState { /// Returns true if the procedure state is done. pub fn is_done(&self) -> bool { - matches!(self, ProcedureState::Done) + matches!(self, ProcedureState::Done { .. }) } /// Returns true if the procedure state failed. @@ -360,7 +379,7 @@ mod tests { }; assert!(status.need_persist()); - let status = Status::Done; + let status = Status::done(); assert!(!status.need_persist()); } @@ -415,7 +434,7 @@ mod tests { fn test_procedure_state() { assert!(ProcedureState::Running.is_running()); assert!(ProcedureState::Running.error().is_none()); - assert!(ProcedureState::Done.is_done()); + assert!(ProcedureState::Done { output: None }.is_done()); let state = ProcedureState::failed(Arc::new(Error::external(MockError::new( StatusCode::Unexpected, diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 584aae520d..93aa91d5bc 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -17,19 +17,19 @@ use snafu::ResultExt; use tokio::sync::watch::Receiver; use crate::error::{ProcedureExecSnafu, Result, WaitWatcherSnafu}; -use crate::procedure::ProcedureState; +use crate::procedure::{Output, ProcedureState}; /// Watcher to watch procedure state. pub type Watcher = Receiver; /// Wait the [Watcher] until the [ProcedureState] is done. -pub async fn wait(watcher: &mut Watcher) -> Result<()> { +pub async fn wait(watcher: &mut Watcher) -> Result> { loop { watcher.changed().await.context(WaitWatcherSnafu)?; match &*watcher.borrow() { ProcedureState::Running => (), - ProcedureState::Done => { - return Ok(()); + ProcedureState::Done { output } => { + return Ok(output.clone()); } ProcedureState::Failed { error } => { return Err(error.clone()).context(ProcedureExecSnafu); @@ -89,7 +89,7 @@ mod tests { self.error = !self.error; Err(Error::retry_later(MockError::new(StatusCode::Internal))) } else { - Ok(Status::Done) + Ok(Status::done_with_output(Arc::new("hello"))) } } @@ -111,6 +111,8 @@ mod tests { .await .unwrap(); - wait(&mut watcher).await.unwrap(); + let output = wait(&mut watcher).await.unwrap().unwrap(); + let output = output.downcast::<&str>().unwrap(); + assert_eq!(output.as_ref(), &"hello"); } } diff --git a/src/meta-srv/src/procedure/region_failover/failover_end.rs b/src/meta-srv/src/procedure/region_failover/failover_end.rs index fa299ee464..48d0a1fa18 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_end.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_end.rs @@ -31,6 +31,6 @@ impl State for RegionFailoverEnd { } fn status(&self) -> Status { - Status::Done + Status::done() } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index d1c73597b7..77e1493cfc 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -511,7 +511,7 @@ mod tests { let pc = &mut ctx.persistent_ctx; if pc.cluster_id == 2 { - Ok((Box::new(RegionMigrationEnd), Status::Done)) + Ok((Box::new(RegionMigrationEnd), Status::done())) } else { pc.cluster_id += 1; Ok((Box::new(MockState), Status::executing(false))) @@ -540,7 +540,7 @@ mod tests { for _ in 0..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } - assert_matches!(status.unwrap(), Status::Done); + assert!(status.unwrap().is_done()); let ctx = TestingEnv::procedure_context(); let mut procedure = new_mock_procedure(&env); @@ -557,7 +557,7 @@ mod tests { status = Some(procedure.execute(&ctx).await.unwrap()); } assert_eq!(procedure.context.persistent_ctx.cluster_id, 2); - assert_matches!(status.unwrap(), Status::Done); + assert!(status.unwrap().is_done()); } #[tokio::test] diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index dd7efdc92b..0aebbd719c 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -27,7 +27,7 @@ pub struct RegionMigrationEnd; #[typetag::serde] impl State for RegionMigrationEnd { async fn next(&mut self, _: &mut Context) -> Result<(Box, Status)> { - Ok((Box::new(RegionMigrationEnd), Status::Done)) + Ok((Box::new(RegionMigrationEnd), Status::done())) } fn as_any(&self) -> &dyn Any { diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 048a1a4cbb..3f81033410 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -55,14 +55,14 @@ impl State for RegionMigrationStart { let from_peer = &ctx.persistent_ctx.from_peer; if self.has_migrated(®ion_route, to_peer)? { - Ok((Box::new(RegionMigrationEnd), Status::Done)) + Ok((Box::new(RegionMigrationEnd), Status::done())) } else if self.invalid_leader_peer(®ion_route, from_peer)? { Ok(( Box::new(RegionMigrationAbort::new(&format!( "Invalid region leader peer: {from_peer:?}, expected: {:?}", region_route.leader_peer.as_ref().unwrap(), ))), - Status::Done, + Status::done(), )) } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true))) diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index c311977838..c09d18c896 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -520,7 +520,7 @@ pub(crate) fn assert_no_persist(status: Status) { /// Asserts the [Status] should be [Status::Done]. pub(crate) fn assert_done(status: Status) { - assert_matches!(status, Status::Done) + assert!(status.is_done()); } /// Asserts the [State] should be [OpenCandidateRegion]. diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index 90b60621a4..180cf31fe1 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -58,7 +58,7 @@ impl State for UpdateMetadata { if let Err(err) = ctx.invalidate_table_cache().await { warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}"); }; - Ok((Box::new(RegionMigrationEnd), Status::Done)) + Ok((Box::new(RegionMigrationEnd), Status::done())) } UpdateMetadata::Rollback => { self.rollback_downgraded_region(ctx).await?; diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index d042cdc373..8c69faf2d2 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -275,7 +275,7 @@ async fn test_on_datanode_drop_regions() { }); let status = procedure.on_datanode_drop_regions().await.unwrap(); - assert!(matches!(status, Status::Done)); + assert!(status.is_done()); handle.await.unwrap();