feat: make procedure able to return output (#3201)

* feat: make procedure able to return output

* refactor: change Output to Any
This commit is contained in:
Weny Xu
2024-01-21 15:56:45 +09:00
committed by GitHub
parent 986f3bb07d
commit 4278c858f3
16 changed files with 63 additions and 42 deletions

View File

@@ -333,7 +333,7 @@ impl AlterTableProcedure {
.await?;
};
Ok(Status::Done)
Ok(Status::done())
}
fn lock_key_inner(&self) -> Vec<StringKey> {

View File

@@ -123,7 +123,7 @@ impl CreateTableProcedure {
}
);
return Ok(Status::Done);
return Ok(Status::done());
}
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
@@ -313,7 +313,7 @@ impl CreateTableProcedure {
.await?;
info!("Created table metadata for table {table_id}");
Ok(Status::Done)
Ok(Status::done())
}
}

View File

@@ -100,7 +100,7 @@ impl DropTableProcedure {
.await?;
if !exist && self.data.task.drop_if_exists {
return Ok(Status::Done);
return Ok(Status::done());
}
ensure!(
@@ -236,7 +236,7 @@ impl DropTableProcedure {
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(Status::Done)
Ok(Status::done())
}
}

View File

@@ -182,7 +182,7 @@ impl TruncateTableProcedure {
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(Status::Done)
Ok(Status::done())
}
}

View File

@@ -60,7 +60,7 @@ pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) {
subprocedures.is_empty(),
"Executing subprocedure is unsupported"
),
Status::Done => break,
Status::Done { .. } => break,
}
}
}
@@ -87,7 +87,7 @@ pub async fn execute_procedure_once(
);
false
}
Status::Done => true,
Status::Done { .. } => true,
}
}
@@ -108,7 +108,7 @@ pub async fn execute_until_suspended_or_done(
match procedure.execute(&ctx).await.unwrap() {
Status::Executing { .. } => (),
Status::Suspended { subprocedures, .. } => return Some(subprocedures),
Status::Done => break,
Status::Done { .. } => break,
}
}

View File

@@ -662,7 +662,7 @@ mod tests {
assert!(ctx.contains_procedure(meta.id));
assert!(ctx.state(meta.id).unwrap().is_running());
meta.set_state(ProcedureState::Done);
meta.set_state(ProcedureState::Done { output: None });
assert!(ctx.state(meta.id).unwrap().is_done());
}
@@ -723,7 +723,7 @@ mod tests {
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
Ok(Status::Done)
Ok(Status::done())
}
fn dump(&self) -> Result<String> {

View File

@@ -22,7 +22,7 @@ use tokio::time;
use super::rwlock::OwnedKeyRwLockGuard;
use crate::error::{self, ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::StringKey;
use crate::procedure::{Output, StringKey};
use crate::store::ProcedureStore;
use crate::ProcedureState::Retrying;
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};
@@ -288,13 +288,13 @@ impl Runner {
Status::Suspended { subprocedures, .. } => {
self.on_suspended(subprocedures).await;
}
Status::Done => {
Status::Done { output } => {
if let Err(e) = self.commit_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
}
self.done();
self.done(output);
return ExecResult::Done;
}
}
@@ -481,7 +481,7 @@ impl Runner {
Ok(())
}
fn done(&self) {
fn done(&self, output: Option<Output>) {
// TODO(yingwen): Add files to remove list.
logging::info!(
"Procedure {}-{} done",
@@ -490,7 +490,7 @@ impl Runner {
);
// Mark the state of this procedure to done.
self.meta.set_state(ProcedureState::Done);
self.meta.set_state(ProcedureState::Done { output });
}
}
@@ -610,7 +610,7 @@ mod tests {
if times == 1 {
Ok(Status::Executing { persist })
} else {
Ok(Status::Done)
Ok(Status::done())
}
}
.boxed()
@@ -703,7 +703,7 @@ mod tests {
time::sleep(Duration::from_millis(200)).await;
Ok(Status::Executing { persist: true })
} else {
Ok(Status::Done)
Ok(Status::done())
}
}
.boxed()
@@ -764,7 +764,7 @@ mod tests {
}
}
if all_child_done {
Ok(Status::Done)
Ok(Status::done())
} else {
// Return suspended to wait for notify.
Ok(Status::Suspended {
@@ -923,7 +923,7 @@ mod tests {
if times == 1 {
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
} else {
Ok(Status::Done)
Ok(Status::done())
}
}
.boxed()

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
@@ -25,6 +26,8 @@ use uuid::Uuid;
use crate::error::{Error, Result};
use crate::watcher::Watcher;
pub type Output = Arc<dyn Any + Send + Sync>;
/// Procedure execution status.
#[derive(Debug)]
pub enum Status {
@@ -40,7 +43,7 @@ pub enum Status {
persist: bool,
},
/// the procedure is done.
Done,
Done { output: Option<Output> },
}
impl Status {
@@ -49,13 +52,29 @@ impl Status {
Status::Executing { persist }
}
/// Returns a [Status::Done] without output.
pub fn done() -> Status {
Status::Done { output: None }
}
/// Returns a [Status::Done] with output.
pub fn done_with_output(output: Output) -> Status {
Status::Done {
output: Some(output),
}
}
/// Returns `true` if the procedure is done.
pub fn is_done(&self) -> bool {
matches!(self, Status::Done { .. })
}
/// Returns `true` if the procedure needs the framework to persist its intermediate state.
pub fn need_persist(&self) -> bool {
// If the procedure is done, the framework doesn't need to persist the procedure
// anymore. It only needs to mark the procedure as committed.
match self {
Status::Executing { persist } | Status::Suspended { persist, .. } => *persist,
Status::Done => false,
Status::Done { .. } => false,
}
}
}
@@ -251,7 +270,7 @@ pub enum ProcedureState {
#[default]
Running,
/// The procedure is finished.
Done,
Done { output: Option<Output> },
/// The procedure is failed and can be retried.
Retrying { error: Arc<Error> },
/// The procedure is failed and cannot proceed anymore.
@@ -276,7 +295,7 @@ impl ProcedureState {
/// Returns true if the procedure state is done.
pub fn is_done(&self) -> bool {
matches!(self, ProcedureState::Done)
matches!(self, ProcedureState::Done { .. })
}
/// Returns true if the procedure state failed.
@@ -360,7 +379,7 @@ mod tests {
};
assert!(status.need_persist());
let status = Status::Done;
let status = Status::done();
assert!(!status.need_persist());
}
@@ -415,7 +434,7 @@ mod tests {
fn test_procedure_state() {
assert!(ProcedureState::Running.is_running());
assert!(ProcedureState::Running.error().is_none());
assert!(ProcedureState::Done.is_done());
assert!(ProcedureState::Done { output: None }.is_done());
let state = ProcedureState::failed(Arc::new(Error::external(MockError::new(
StatusCode::Unexpected,

View File

@@ -17,19 +17,19 @@ use snafu::ResultExt;
use tokio::sync::watch::Receiver;
use crate::error::{ProcedureExecSnafu, Result, WaitWatcherSnafu};
use crate::procedure::ProcedureState;
use crate::procedure::{Output, ProcedureState};
/// Watcher to watch procedure state.
pub type Watcher = Receiver<ProcedureState>;
/// Wait the [Watcher] until the [ProcedureState] is done.
pub async fn wait(watcher: &mut Watcher) -> Result<()> {
pub async fn wait(watcher: &mut Watcher) -> Result<Option<Output>> {
loop {
watcher.changed().await.context(WaitWatcherSnafu)?;
match &*watcher.borrow() {
ProcedureState::Running => (),
ProcedureState::Done => {
return Ok(());
ProcedureState::Done { output } => {
return Ok(output.clone());
}
ProcedureState::Failed { error } => {
return Err(error.clone()).context(ProcedureExecSnafu);
@@ -89,7 +89,7 @@ mod tests {
self.error = !self.error;
Err(Error::retry_later(MockError::new(StatusCode::Internal)))
} else {
Ok(Status::Done)
Ok(Status::done_with_output(Arc::new("hello")))
}
}
@@ -111,6 +111,8 @@ mod tests {
.await
.unwrap();
wait(&mut watcher).await.unwrap();
let output = wait(&mut watcher).await.unwrap().unwrap();
let output = output.downcast::<&str>().unwrap();
assert_eq!(output.as_ref(), &"hello");
}
}

View File

@@ -31,6 +31,6 @@ impl State for RegionFailoverEnd {
}
fn status(&self) -> Status {
Status::Done
Status::done()
}
}

View File

@@ -511,7 +511,7 @@ mod tests {
let pc = &mut ctx.persistent_ctx;
if pc.cluster_id == 2 {
Ok((Box::new(RegionMigrationEnd), Status::Done))
Ok((Box::new(RegionMigrationEnd), Status::done()))
} else {
pc.cluster_id += 1;
Ok((Box::new(MockState), Status::executing(false)))
@@ -540,7 +540,7 @@ mod tests {
for _ in 0..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);
assert!(status.unwrap().is_done());
let ctx = TestingEnv::procedure_context();
let mut procedure = new_mock_procedure(&env);
@@ -557,7 +557,7 @@ mod tests {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_eq!(procedure.context.persistent_ctx.cluster_id, 2);
assert_matches!(status.unwrap(), Status::Done);
assert!(status.unwrap().is_done());
}
#[tokio::test]

View File

@@ -27,7 +27,7 @@ pub struct RegionMigrationEnd;
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(RegionMigrationEnd), Status::Done))
Ok((Box::new(RegionMigrationEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {

View File

@@ -55,14 +55,14 @@ impl State for RegionMigrationStart {
let from_peer = &ctx.persistent_ctx.from_peer;
if self.has_migrated(&region_route, to_peer)? {
Ok((Box::new(RegionMigrationEnd), Status::Done))
Ok((Box::new(RegionMigrationEnd), Status::done()))
} else if self.invalid_leader_peer(&region_route, from_peer)? {
Ok((
Box::new(RegionMigrationAbort::new(&format!(
"Invalid region leader peer: {from_peer:?}, expected: {:?}",
region_route.leader_peer.as_ref().unwrap(),
))),
Status::Done,
Status::done(),
))
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))

View File

@@ -520,7 +520,7 @@ pub(crate) fn assert_no_persist(status: Status) {
/// Asserts the [Status] should be [Status::Done].
pub(crate) fn assert_done(status: Status) {
assert_matches!(status, Status::Done)
assert!(status.is_done());
}
/// Asserts the [State] should be [OpenCandidateRegion].

View File

@@ -58,7 +58,7 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok((Box::new(RegionMigrationEnd), Status::Done))
Ok((Box::new(RegionMigrationEnd), Status::done()))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;

View File

@@ -275,7 +275,7 @@ async fn test_on_datanode_drop_regions() {
});
let status = procedure.on_datanode_drop_regions().await.unwrap();
assert!(matches!(status, Status::Done));
assert!(status.is_done());
handle.await.unwrap();