mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 22:02:56 +00:00
feat(procedure): Max retry time (#1095)
* feat: procedure config * fix: modify config * feat: add retry logic * feat: add error * feat: add it * feat: add it * feat: add it * feat: rm retry from runner * feat: use backon * feat: add retry_interval * feat: add retry_interval * fix: conflict * fix: cr * feat: add retry error and id * feat: rename * refactor: execute * feat: use config dir * fix: cr * fix: cr * fix: fmt * fix: fmt * fix: pr * fix: it * fix: rm unless cmd params * feat: add toml * fix: ut * feat: add rolling back * fix: cr * fix: cr * fix: cr * fix: ci * fix: ci * fix: ci * chore: Apply suggestions from code review --------- Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1592,6 +1592,7 @@ name = "common-procedure"
|
||||
version = "0.1.1"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"backon 0.4.0",
|
||||
"common-error",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
|
||||
@@ -44,5 +44,7 @@ max_purge_tasks = 32
|
||||
|
||||
# Procedure storage options, see `standalone.example.toml`.
|
||||
# [procedure.store]
|
||||
# type = 'File'
|
||||
# data_dir = '/tmp/greptimedb/procedure/'
|
||||
# type = "File"
|
||||
# data_dir = "/tmp/greptimedb/procedure/"
|
||||
# max_retry_times = 3
|
||||
# retry_delay = "500ms"
|
||||
|
||||
@@ -114,3 +114,7 @@ max_purge_tasks = 32
|
||||
# type = "File"
|
||||
# # Procedure data path.
|
||||
# data_dir = "/tmp/greptimedb/procedure/"
|
||||
# # Procedure max retry time.
|
||||
# max_retry_times = 3
|
||||
# # Initial retry delay of procedures, increases exponentially
|
||||
# retry_delay = "500ms"
|
||||
|
||||
@@ -150,7 +150,6 @@ impl TryFrom<StartCommand> for DatanodeOptions {
|
||||
if let Some(wal_dir) = cmd.wal_dir {
|
||||
opts.wal.dir = wal_dir;
|
||||
}
|
||||
|
||||
if let Some(procedure_dir) = cmd.procedure_dir {
|
||||
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ object-store = { path = "../../object-store" }
|
||||
serde.workspace = true
|
||||
serde_json = "1.0"
|
||||
smallvec = "1"
|
||||
backon = "0.4.0"
|
||||
snafu.workspace = true
|
||||
tokio.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
@@ -97,6 +97,16 @@ pub enum Error {
|
||||
source: Arc<Error>,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Procedure retry exceeded max times, procedure_id: {}, source:{}",
|
||||
procedure_id,
|
||||
source
|
||||
))]
|
||||
RetryTimesExceeded {
|
||||
source: Arc<Error>,
|
||||
procedure_id: ProcedureId,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -111,6 +121,7 @@ impl ErrorExt for Error {
|
||||
| Error::ListState { .. }
|
||||
| Error::ReadState { .. }
|
||||
| Error::FromJson { .. }
|
||||
| Error::RetryTimesExceeded { .. }
|
||||
| Error::RetryLater { .. }
|
||||
| Error::WaitWatcher { .. } => StatusCode::Internal,
|
||||
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
|
||||
|
||||
@@ -17,8 +17,10 @@ mod runner;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backon::ExponentialBuilder;
|
||||
use common_telemetry::logging;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ensure;
|
||||
@@ -291,12 +293,16 @@ impl ManagerContext {
|
||||
pub struct ManagerConfig {
|
||||
/// Object store
|
||||
pub object_store: ObjectStore,
|
||||
pub max_retry_times: usize,
|
||||
pub retry_delay: Duration,
|
||||
}
|
||||
|
||||
/// A [ProcedureManager] that maintains procedure states locally.
|
||||
pub struct LocalManager {
|
||||
manager_ctx: Arc<ManagerContext>,
|
||||
state_store: StateStoreRef,
|
||||
max_retry_times: usize,
|
||||
retry_delay: Duration,
|
||||
}
|
||||
|
||||
impl LocalManager {
|
||||
@@ -305,6 +311,8 @@ impl LocalManager {
|
||||
LocalManager {
|
||||
manager_ctx: Arc::new(ManagerContext::new()),
|
||||
state_store: Arc::new(ObjectStateStore::new(config.object_store)),
|
||||
max_retry_times: config.max_retry_times,
|
||||
retry_delay: config.retry_delay,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,7 +329,11 @@ impl LocalManager {
|
||||
procedure,
|
||||
manager_ctx: self.manager_ctx.clone(),
|
||||
step,
|
||||
exponential_builder: ExponentialBuilder::default()
|
||||
.with_min_delay(self.retry_delay)
|
||||
.with_max_times(self.max_retry_times),
|
||||
store: ProcedureStore::new(self.state_store.clone()),
|
||||
rolling_back: false,
|
||||
};
|
||||
|
||||
let watcher = meta.state_receiver.clone();
|
||||
@@ -543,6 +555,8 @@ mod tests {
|
||||
let dir = create_temp_dir("register");
|
||||
let config = ManagerConfig {
|
||||
object_store: test_util::new_object_store(&dir),
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
};
|
||||
let manager = LocalManager::new(config);
|
||||
|
||||
@@ -562,6 +576,8 @@ mod tests {
|
||||
let object_store = test_util::new_object_store(&dir);
|
||||
let config = ManagerConfig {
|
||||
object_store: object_store.clone(),
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
};
|
||||
let manager = LocalManager::new(config);
|
||||
|
||||
@@ -606,6 +622,8 @@ mod tests {
|
||||
let dir = create_temp_dir("submit");
|
||||
let config = ManagerConfig {
|
||||
object_store: test_util::new_object_store(&dir),
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
};
|
||||
let manager = LocalManager::new(config);
|
||||
|
||||
@@ -652,6 +670,8 @@ mod tests {
|
||||
let dir = create_temp_dir("on_err");
|
||||
let config = ManagerConfig {
|
||||
object_store: test_util::new_object_store(&dir),
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
};
|
||||
let manager = LocalManager::new(config);
|
||||
|
||||
|
||||
@@ -15,15 +15,15 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use backon::{BackoffBuilder, ExponentialBuilder};
|
||||
use common_telemetry::logging;
|
||||
use tokio::time;
|
||||
|
||||
use crate::error::{ProcedurePanicSnafu, Result};
|
||||
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
|
||||
use crate::store::ProcedureStore;
|
||||
use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status};
|
||||
|
||||
const ERR_WAIT_DURATION: Duration = Duration::from_secs(30);
|
||||
use crate::ProcedureState::Retrying;
|
||||
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ExecResult {
|
||||
@@ -108,7 +108,9 @@ pub(crate) struct Runner {
|
||||
pub(crate) procedure: BoxedProcedure,
|
||||
pub(crate) manager_ctx: Arc<ManagerContext>,
|
||||
pub(crate) step: u32,
|
||||
pub(crate) exponential_builder: ExponentialBuilder,
|
||||
pub(crate) store: ProcedureStore,
|
||||
pub(crate) rolling_back: bool,
|
||||
}
|
||||
|
||||
impl Runner {
|
||||
@@ -164,18 +166,56 @@ impl Runner {
|
||||
provider: self.manager_ctx.clone(),
|
||||
};
|
||||
|
||||
self.rolling_back = false;
|
||||
self.execute_once_with_retry(&ctx).await;
|
||||
}
|
||||
|
||||
async fn execute_once_with_retry(&mut self, ctx: &Context) {
|
||||
let mut retry = self.exponential_builder.build();
|
||||
let mut retry_times = 0;
|
||||
loop {
|
||||
match self.execute_once(&ctx).await {
|
||||
ExecResult::Continue => (),
|
||||
match self.execute_once(ctx).await {
|
||||
ExecResult::Done | ExecResult::Failed => return,
|
||||
ExecResult::Continue => (),
|
||||
ExecResult::RetryLater => {
|
||||
self.wait_on_err().await;
|
||||
retry_times += 1;
|
||||
if let Some(d) = retry.next() {
|
||||
self.wait_on_err(d, retry_times).await;
|
||||
} else {
|
||||
assert!(self.meta.state().is_retrying());
|
||||
if let Retrying { error } = self.meta.state() {
|
||||
self.meta.set_state(ProcedureState::failed(Arc::new(
|
||||
Error::RetryTimesExceeded {
|
||||
source: error,
|
||||
procedure_id: self.meta.id,
|
||||
},
|
||||
)))
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn rollback(&mut self, error: Arc<Error>) -> ExecResult {
|
||||
if let Err(e) = self.rollback_procedure().await {
|
||||
self.rolling_back = true;
|
||||
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
|
||||
return ExecResult::RetryLater;
|
||||
}
|
||||
self.meta.set_state(ProcedureState::failed(error));
|
||||
ExecResult::Failed
|
||||
}
|
||||
|
||||
async fn execute_once(&mut self, ctx: &Context) -> ExecResult {
|
||||
// if rolling_back, there is no need to execute again.
|
||||
if self.rolling_back {
|
||||
// We can definitely get the previous error here.
|
||||
let state = self.meta.state();
|
||||
let err = state.error().unwrap();
|
||||
return self.rollback(err.clone()).await;
|
||||
}
|
||||
match self.procedure.execute(ctx).await {
|
||||
Ok(status) => {
|
||||
logging::debug!(
|
||||
@@ -186,8 +226,11 @@ impl Runner {
|
||||
status.need_persist(),
|
||||
);
|
||||
|
||||
if status.need_persist() && self.persist_procedure().await.is_err() {
|
||||
return ExecResult::RetryLater;
|
||||
if status.need_persist() {
|
||||
if let Err(err) = self.persist_procedure().await {
|
||||
self.meta.set_state(ProcedureState::retrying(Arc::new(err)));
|
||||
return ExecResult::RetryLater;
|
||||
}
|
||||
}
|
||||
|
||||
match status {
|
||||
@@ -196,7 +239,8 @@ impl Runner {
|
||||
self.on_suspended(subprocedures).await;
|
||||
}
|
||||
Status::Done => {
|
||||
if self.commit_procedure().await.is_err() {
|
||||
if let Err(e) = self.commit_procedure().await {
|
||||
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
|
||||
return ExecResult::RetryLater;
|
||||
}
|
||||
|
||||
@@ -217,17 +261,12 @@ impl Runner {
|
||||
);
|
||||
|
||||
if e.is_retry_later() {
|
||||
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
|
||||
return ExecResult::RetryLater;
|
||||
}
|
||||
|
||||
self.meta.set_state(ProcedureState::failed(Arc::new(e)));
|
||||
|
||||
// Write rollback key so we can skip this procedure while recovering procedures.
|
||||
if self.rollback_procedure().await.is_err() {
|
||||
return ExecResult::RetryLater;
|
||||
}
|
||||
|
||||
ExecResult::Failed
|
||||
self.rollback(Arc::new(e)).await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -261,7 +300,9 @@ impl Runner {
|
||||
procedure,
|
||||
manager_ctx: self.manager_ctx.clone(),
|
||||
step,
|
||||
exponential_builder: self.exponential_builder.clone(),
|
||||
store: self.store.clone(),
|
||||
rolling_back: false,
|
||||
};
|
||||
|
||||
// Insert the procedure. We already check the procedure existence before inserting
|
||||
@@ -285,8 +326,16 @@ impl Runner {
|
||||
});
|
||||
}
|
||||
|
||||
async fn wait_on_err(&self) {
|
||||
time::sleep(ERR_WAIT_DURATION).await;
|
||||
/// Extend the retry time to wait for the next retry.
|
||||
async fn wait_on_err(&self, d: Duration, i: u64) {
|
||||
logging::info!(
|
||||
"Procedure {}-{} retry for the {} times after {} millis",
|
||||
self.procedure.type_name(),
|
||||
self.meta.id,
|
||||
i,
|
||||
d.as_millis(),
|
||||
);
|
||||
time::sleep(d).await;
|
||||
}
|
||||
|
||||
async fn on_suspended(&self, subprocedures: Vec<ProcedureWithId>) {
|
||||
@@ -416,7 +465,9 @@ mod tests {
|
||||
procedure,
|
||||
manager_ctx: Arc::new(ManagerContext::new()),
|
||||
step: 0,
|
||||
exponential_builder: ExponentialBuilder::default(),
|
||||
store,
|
||||
rolling_back: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -744,7 +795,7 @@ mod tests {
|
||||
|
||||
let res = runner.execute_once(&ctx).await;
|
||||
assert!(res.is_retry_later(), "{res:?}");
|
||||
assert!(meta.state().is_running());
|
||||
assert!(meta.state().is_retrying());
|
||||
|
||||
let res = runner.execute_once(&ctx).await;
|
||||
assert!(res.is_done(), "{res:?}");
|
||||
@@ -752,6 +803,36 @@ mod tests {
|
||||
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_exceed_max_retry_later() {
|
||||
let exec_fn =
|
||||
|_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
|
||||
|
||||
let exceed_max_retry_later = ProcedureAdapter {
|
||||
data: "exceed_max_retry_later".to_string(),
|
||||
lock_key: LockKey::single("catalog.schema.table"),
|
||||
exec_fn,
|
||||
};
|
||||
|
||||
let dir = create_temp_dir("exceed_max_retry_later");
|
||||
let meta = exceed_max_retry_later.new_meta(ROOT_ID);
|
||||
let object_store = test_util::new_object_store(&dir);
|
||||
let procedure_store = ProcedureStore::from(object_store.clone());
|
||||
let mut runner = new_runner(
|
||||
meta.clone(),
|
||||
Box::new(exceed_max_retry_later),
|
||||
procedure_store,
|
||||
);
|
||||
runner.exponential_builder = ExponentialBuilder::default()
|
||||
.with_min_delay(Duration::from_millis(1))
|
||||
.with_max_times(3);
|
||||
|
||||
// Run the runner and execute the procedure.
|
||||
runner.execute_procedure_in_loop().await;
|
||||
let err = meta.state().error().unwrap().to_string();
|
||||
assert!(err.contains("Procedure retry exceeded max times"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_child_error() {
|
||||
let mut times = 0;
|
||||
@@ -819,7 +900,7 @@ mod tests {
|
||||
// Replace the manager ctx.
|
||||
runner.manager_ctx = manager_ctx;
|
||||
|
||||
// Run the runer and execute the procedure.
|
||||
// Run the runner and execute the procedure.
|
||||
runner.run().await;
|
||||
let err = meta.state().error().unwrap().to_string();
|
||||
assert!(err.contains("subprocedure failed"), "{err}");
|
||||
|
||||
@@ -206,6 +206,8 @@ pub enum ProcedureState {
|
||||
Running,
|
||||
/// The procedure is finished.
|
||||
Done,
|
||||
/// The procedure is failed and can be retried.
|
||||
Retrying { error: Arc<Error> },
|
||||
/// The procedure is failed and cannot proceed anymore.
|
||||
Failed { error: Arc<Error> },
|
||||
}
|
||||
@@ -216,6 +218,11 @@ impl ProcedureState {
|
||||
ProcedureState::Failed { error }
|
||||
}
|
||||
|
||||
/// Returns a [ProcedureState] with retrying state.
|
||||
pub fn retrying(error: Arc<Error>) -> ProcedureState {
|
||||
ProcedureState::Retrying { error }
|
||||
}
|
||||
|
||||
/// Returns true if the procedure state is running.
|
||||
pub fn is_running(&self) -> bool {
|
||||
matches!(self, ProcedureState::Running)
|
||||
@@ -231,10 +238,16 @@ impl ProcedureState {
|
||||
matches!(self, ProcedureState::Failed { .. })
|
||||
}
|
||||
|
||||
/// Returns true if the procedure state is retrying.
|
||||
pub fn is_retrying(&self) -> bool {
|
||||
matches!(self, ProcedureState::Retrying { .. })
|
||||
}
|
||||
|
||||
/// Returns the error.
|
||||
pub fn error(&self) -> Option<&Arc<Error>> {
|
||||
match self {
|
||||
ProcedureState::Failed { error } => Some(error),
|
||||
ProcedureState::Retrying { error } => Some(error),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,9 @@ pub async fn wait(watcher: &mut Watcher) -> Result<()> {
|
||||
ProcedureState::Failed { error } => {
|
||||
return Err(error.clone()).context(ProcedureExecSnafu);
|
||||
}
|
||||
ProcedureState::Retrying { error } => {
|
||||
return Err(error.clone()).context(ProcedureExecSnafu);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,11 +149,22 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
|
||||
pub struct ProcedureConfig {
|
||||
/// Storage config for procedure manager.
|
||||
pub store: ObjectStoreConfig,
|
||||
/// Max retry times of procedure.
|
||||
pub max_retry_times: usize,
|
||||
/// Initial retry delay of procedures, increases exponentially.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub retry_delay: Duration,
|
||||
}
|
||||
|
||||
impl Default for ProcedureConfig {
|
||||
fn default() -> ProcedureConfig {
|
||||
ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string())
|
||||
ProcedureConfig {
|
||||
store: ObjectStoreConfig::File(FileConfig {
|
||||
data_dir: "/tmp/greptimedb/procedure/".to_string(),
|
||||
}),
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +172,7 @@ impl ProcedureConfig {
|
||||
pub fn from_file_path(path: String) -> ProcedureConfig {
|
||||
ProcedureConfig {
|
||||
store: ObjectStoreConfig::File(FileConfig { data_dir: path }),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -460,7 +460,11 @@ pub(crate) async fn create_procedure_manager(
|
||||
);
|
||||
|
||||
let object_store = new_object_store(&procedure_config.store).await?;
|
||||
let manager_config = ManagerConfig { object_store };
|
||||
let manager_config = ManagerConfig {
|
||||
object_store,
|
||||
max_retry_times: procedure_config.max_retry_times,
|
||||
retry_delay: procedure_config.retry_delay,
|
||||
};
|
||||
|
||||
Ok(Some(Arc::new(LocalManager::new(manager_config))))
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
|
||||
use common_query::Output;
|
||||
@@ -64,6 +65,8 @@ impl MockInstance {
|
||||
store: ObjectStoreConfig::File(FileConfig {
|
||||
data_dir: procedure_dir.path().to_str().unwrap().to_string(),
|
||||
}),
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
});
|
||||
|
||||
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
|
||||
@@ -197,7 +197,7 @@ impl CreateTableProcedure {
|
||||
};
|
||||
|
||||
match sub_state {
|
||||
ProcedureState::Running => Ok(Status::Suspended {
|
||||
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
|
||||
subprocedures: Vec::new(),
|
||||
persist: false,
|
||||
}),
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use catalog::local::MemoryCatalogManager;
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -59,7 +60,11 @@ impl TestEnv {
|
||||
let accessor = Fs::default().root(&procedure_dir).build().unwrap();
|
||||
let object_store = ObjectStore::new(accessor).finish();
|
||||
|
||||
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store }));
|
||||
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig {
|
||||
object_store,
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_secs(500),
|
||||
}));
|
||||
|
||||
let catalog_manager = Arc::new(MemoryCatalogManager::default());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user