diff --git a/Cargo.lock b/Cargo.lock index 8fb692d2c7..1cb5079934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2122,6 +2122,7 @@ dependencies = [ "futures-util", "humantime-serde", "object-store", + "rand", "serde", "serde_json", "smallvec", diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index d5b4767774..df06ff505a 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -22,6 +22,7 @@ common-telemetry.workspace = true futures.workspace = true humantime-serde.workspace = true object-store.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true smallvec.workspace = true diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 0e0ac2020a..1d1439e24c 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Add; use std::sync::Arc; use std::time::Duration; use backon::{BackoffBuilder, ExponentialBuilder}; use common_telemetry::{debug, error, info}; +use rand::Rng; use tokio::time; use super::rwlock::OwnedKeyRwLockGuard; @@ -198,6 +200,11 @@ impl Runner { ProcedureState::Retrying { error } => { retry_times += 1; if let Some(d) = retry.next() { + let millis = d.as_millis() as u64; + // Add random noise to the retry delay to avoid retry storms. + let noise = rand::thread_rng().gen_range(0..(millis / 4) + 1); + let d = d.add(Duration::from_millis(noise)); + self.wait_on_err(d, retry_times).await; } else { self.meta