From e316797ffffbfc5ccf96d515e12b259beb4ac9b1 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 22 Apr 2026 21:38:36 -0700 Subject: [PATCH] feat(cli): add retry and import state fundations (#8007) * feat(cli): add retry and import state fiundations Signed-off-by: jeremyhi * fix: by AI comments Signed-off-by: jeremyhi * fix: by AI comments Signed-off-by: jeremyhi * fix: by human comments Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi --- Cargo.lock | 2 + Cargo.toml | 1 + src/cli/Cargo.toml | 2 + src/cli/src/data.rs | 1 + src/cli/src/data/import_v2.rs | 1 + src/cli/src/data/import_v2/error.rs | 35 ++ src/cli/src/data/import_v2/state.rs | 533 ++++++++++++++++++++++++++++ src/cli/src/data/retry.rs | 113 ++++++ 8 files changed, 688 insertions(+) create mode 100644 src/cli/src/data/import_v2/state.rs create mode 100644 src/cli/src/data/retry.rs diff --git a/Cargo.lock b/Cargo.lock index a40d5b7cdd..a006c7403c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1899,6 +1899,7 @@ dependencies = [ "async-stream", "async-trait", "auth", + "backon", "base64 0.22.1", "cache", "catalog", @@ -1923,6 +1924,7 @@ dependencies = [ "common-wal", "datatypes", "etcd-client", + "fs2", "futures", "humantime", "meta-client", diff --git a/Cargo.toml b/Cargo.toml index 153cc0dbe5..f8ad234c8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,6 +151,7 @@ etcd-client = { version = "0.17", features = [ "tls", "tls-roots", ] } +fs2 = "0.4" fst = "0.4.7" futures = "0.3" futures-util = "0.3" diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 1eb2736007..172a691f6e 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -19,6 +19,7 @@ workspace = true async-stream.workspace = true async-trait.workspace = true auth.workspace = true +backon.workspace = true base64.workspace = true cache.workspace = true catalog.workspace = true @@ -44,6 +45,7 @@ common-version.workspace = true common-wal.workspace = true datatypes.workspace = true etcd-client.workspace = true +fs2.workspace = true futures.workspace = true humantime.workspace = true meta-client.workspace = true diff --git a/src/cli/src/data.rs b/src/cli/src/data.rs index 114886542e..ac54138490 100644 --- a/src/cli/src/data.rs +++ b/src/cli/src/data.rs @@ -17,6 +17,7 @@ pub mod export_v2; mod import; pub mod import_v2; pub(crate) mod path; +pub(crate) mod retry; pub mod snapshot_storage; pub(crate) mod sql; mod storage_export; diff --git a/src/cli/src/data/import_v2.rs b/src/cli/src/data/import_v2.rs index 772e18cc93..2b8a935607 100644 --- a/src/cli/src/data/import_v2.rs +++ b/src/cli/src/data/import_v2.rs @@ -37,5 +37,6 @@ mod command; pub mod error; pub mod executor; +pub(crate) mod state; pub use command::ImportV2Command; diff --git a/src/cli/src/data/import_v2/error.rs b/src/cli/src/data/import_v2/error.rs index e2b6f90d1f..3bb712acd5 100644 --- a/src/cli/src/data/import_v2/error.rs +++ b/src/cli/src/data/import_v2/error.rs @@ -104,6 +104,37 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to parse import state file"))] + ImportStateParse { + #[snafu(source)] + error: serde_json::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Import state I/O failed at '{}': {}", path, error))] + ImportStateIo { + path: String, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Import state is already locked at '{}'", path))] + ImportStateLocked { + path: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Import state references unknown chunk {}", chunk_id))] + ImportStateUnknownChunk { + chunk_id: u32, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -117,10 +148,14 @@ impl ErrorExt for Error { | Error::IncompleteSnapshot { .. } | Error::EmptyChunkManifest { .. } | Error::MissingChunkData { .. } => StatusCode::InvalidArguments, + Error::ImportStateUnknownChunk { .. } => StatusCode::Unexpected, Error::Database { error, .. } => error.status_code(), Error::SnapshotStorage { error, .. } | Error::ChunkImportFailed { error, .. } => { error.status_code() } + Error::ImportStateParse { .. } => StatusCode::Internal, + Error::ImportStateIo { .. } => StatusCode::StorageUnavailable, + Error::ImportStateLocked { .. } => StatusCode::IllegalState, } } diff --git a/src/cli/src/data/import_v2/state.rs b/src/cli/src/data/import_v2/state.rs new file mode 100644 index 0000000000..56b35a8a75 --- /dev/null +++ b/src/cli/src/data/import_v2/state.rs @@ -0,0 +1,533 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; + +use chrono::{DateTime, Utc}; +use fs2::FileExt; +use serde::{Deserialize, Serialize}; +use snafu::{IntoError, OptionExt, ResultExt}; +use tokio::io::AsyncWriteExt; + +use crate::data::import_v2::error::{ + ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu, + ImportStateUnknownChunkSnafu, Result, +}; +use crate::data::path::encode_path_segment; + +const IMPORT_STATE_ROOT: &str = ".greptime"; +const IMPORT_STATE_DIR: &str = "import_state"; +static IMPORT_STATE_TMP_ID: AtomicU64 = AtomicU64::new(0); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum ImportChunkStatus { + Pending, + InProgress, + Completed, + Failed, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct ImportChunkState { + pub(crate) id: u32, + pub(crate) status: ImportChunkStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct ImportState { + pub(crate) snapshot_id: String, + pub(crate) target_addr: String, + pub(crate) updated_at: DateTime, + // Chunk counts are expected to stay below ~1000, so linear scans are acceptable here. + pub(crate) chunks: Vec, +} + +impl ImportState { + pub(crate) fn new( + snapshot_id: impl Into, + target_addr: impl Into, + chunk_ids: I, + ) -> Self + where + I: IntoIterator, + { + Self { + snapshot_id: snapshot_id.into(), + target_addr: target_addr.into(), + updated_at: Utc::now(), + chunks: chunk_ids + .into_iter() + .map(|id| ImportChunkState { + id, + status: ImportChunkStatus::Pending, + error: None, + }) + .collect(), + } + } + + pub(crate) fn chunk_status(&self, chunk_id: u32) -> Option { + self.chunks + .iter() + .find(|chunk| chunk.id == chunk_id) + .map(|chunk| chunk.status.clone()) + } + + pub(crate) fn set_chunk_status( + &mut self, + chunk_id: u32, + status: ImportChunkStatus, + error: Option, + ) -> Result<()> { + let chunk = self + .chunks + .iter_mut() + .find(|chunk| chunk.id == chunk_id) + .context(ImportStateUnknownChunkSnafu { chunk_id })?; + chunk.status = status; + chunk.error = error; + self.updated_at = Utc::now(); + Ok(()) + } +} + +#[derive(Debug)] +pub(crate) struct ImportStateLockGuard { + file: std::fs::File, +} + +impl Drop for ImportStateLockGuard { + fn drop(&mut self) { + let _ = self.file.unlock(); + } +} + +pub(crate) fn default_state_path(snapshot_id: &str, target_addr: &str) -> Option { + let home = default_home_dir_with(|key| std::env::var_os(key)); + let cwd = std::env::current_dir().ok(); + default_state_path_with(home.as_deref(), cwd.as_deref(), snapshot_id, target_addr) +} + +fn default_home_dir_with(get: F) -> Option +where + F: for<'a> Fn(&'a str) -> Option, +{ + get("HOME") + .or_else(|| get("USERPROFILE")) + .map(PathBuf::from) + .or_else(|| { + let drive = get("HOMEDRIVE")?; + let path = get("HOMEPATH")?; + Some(PathBuf::from(drive).join(path)) + }) +} + +fn default_state_path_with( + home: Option<&Path>, + cwd: Option<&Path>, + snapshot_id: &str, + target_addr: &str, +) -> Option { + let file_name = import_state_file_name(snapshot_id, target_addr); + match (home, cwd) { + (Some(home), _) => Some( + home.join(IMPORT_STATE_ROOT) + .join(IMPORT_STATE_DIR) + .join(file_name), + ), + (None, Some(cwd)) => Some(cwd.join(file_name)), + (None, None) => None, + } +} + +fn import_state_file_name(snapshot_id: &str, target_addr: &str) -> String { + format!( + ".import_state_{}_{}.json", + encode_path_segment(snapshot_id), + encode_path_segment(target_addr) + ) +} + +pub(crate) async fn load_import_state(path: &Path) -> Result> { + match tokio::fs::read(path).await { + Ok(bytes) => { + let mut state: ImportState = + serde_json::from_slice(&bytes).context(ImportStateParseSnafu)?; + normalize_import_state_for_resume(&mut state); + Ok(Some(state)) + } + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(source) => Err(source).context(ImportStateIoSnafu { + path: path.display().to_string(), + }), + } +} + +/// Caller must hold the lock acquired via `try_acquire_import_state_lock`. +pub(crate) async fn save_import_state(path: &Path, state: &ImportState) -> Result<()> { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent) + .await + .context(ImportStateIoSnafu { + path: parent.display().to_string(), + })?; + } + + let bytes = + serde_json::to_vec_pretty(state).expect("ImportState should always be serializable"); + let tmp_path = unique_tmp_path(path); + let mut file = tokio::fs::File::create(&tmp_path) + .await + .context(ImportStateIoSnafu { + path: tmp_path.display().to_string(), + })?; + file.write_all(&bytes).await.context(ImportStateIoSnafu { + path: tmp_path.display().to_string(), + })?; + file.sync_all().await.context(ImportStateIoSnafu { + path: tmp_path.display().to_string(), + })?; + // Close before rename; Windows forbids renaming an open file. + drop(file); + + tokio::fs::rename(&tmp_path, path) + .await + .context(ImportStateIoSnafu { + path: path.display().to_string(), + })?; + sync_parent_dir(path).await?; + Ok(()) +} + +pub(crate) fn try_acquire_import_state_lock(path: &Path) -> Result { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).context(ImportStateIoSnafu { + path: parent.display().to_string(), + })?; + } + + let lock_path = import_state_lock_path(path); + let file = std::fs::OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(false) + .open(&lock_path) + .context(ImportStateIoSnafu { + path: lock_path.display().to_string(), + })?; + file.try_lock_exclusive().map_err(|error| { + if error.kind() == std::io::ErrorKind::WouldBlock { + ImportStateLockedSnafu { + path: lock_path.display().to_string(), + } + .build() + } else { + ImportStateIoSnafu { + path: lock_path.display().to_string(), + } + .into_error(error) + } + })?; + + Ok(ImportStateLockGuard { file }) +} + +fn unique_tmp_path(path: &Path) -> PathBuf { + let pid = std::process::id(); + let seq = IMPORT_STATE_TMP_ID.fetch_add(1, Ordering::Relaxed); + let file_name = path.file_name().unwrap_or_default().to_string_lossy(); + path.with_file_name(format!("{file_name}.{pid}.{seq}.tmp")) +} + +fn import_state_lock_path(path: &Path) -> PathBuf { + let file_name = path.file_name().unwrap_or_default().to_string_lossy(); + path.with_file_name(format!("{file_name}.lock")) +} + +fn normalize_import_state_for_resume(state: &mut ImportState) { + for chunk in &mut state.chunks { + if chunk.status == ImportChunkStatus::InProgress { + chunk.status = ImportChunkStatus::Pending; + chunk.error = None; + } + } +} + +pub(crate) async fn delete_import_state(path: &Path) -> Result<()> { + match tokio::fs::remove_file(path).await { + Ok(()) => { + sync_parent_dir(path).await?; + Ok(()) + } + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(source) => Err(source).context(ImportStateIoSnafu { + path: path.display().to_string(), + }), + } +} + +#[cfg(unix)] +async fn sync_parent_dir(path: &Path) -> Result<()> { + let Some(parent) = path.parent() else { + return Ok(()); + }; + + let dir = tokio::fs::File::open(parent) + .await + .context(ImportStateIoSnafu { + path: parent.display().to_string(), + })?; + dir.sync_all().await.context(ImportStateIoSnafu { + path: parent.display().to_string(), + })?; + Ok(()) +} + +#[cfg(not(unix))] +async fn sync_parent_dir(_path: &Path) -> Result<()> { + Ok(()) +} + +#[cfg(test)] +mod tests { + use chrono::Utc; + use tempfile::tempdir; + + use super::*; + + #[test] + fn test_import_state_new_initializes_pending_chunks() { + let state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]); + + assert_eq!(state.snapshot_id, "snapshot-1"); + assert_eq!(state.target_addr, "127.0.0.1:4000"); + assert_eq!(state.chunks.len(), 2); + assert_eq!(state.chunks[0].status, ImportChunkStatus::Pending); + assert_eq!(state.chunks[1].status, ImportChunkStatus::Pending); + } + + #[test] + fn test_set_chunk_status_updates_timestamp_and_error() { + let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]); + let before = state.updated_at; + state.updated_at = Utc::now() - chrono::Duration::seconds(10); + + state + .set_chunk_status(1, ImportChunkStatus::Failed, Some("timeout".to_string())) + .unwrap(); + assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Failed)); + assert_eq!(state.chunks[0].error.as_deref(), Some("timeout")); + assert!(state.updated_at > before); + } + + #[test] + fn test_set_chunk_status_rejects_unknown_chunk_id() { + let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]); + + let error = state + .set_chunk_status(99, ImportChunkStatus::Completed, None) + .unwrap_err(); + + assert!(matches!( + error, + crate::data::import_v2::error::Error::ImportStateUnknownChunk { chunk_id, .. } if chunk_id == 99 + )); + } + + #[tokio::test] + async fn test_save_and_load_import_state_round_trip() { + let dir = tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]); + state + .set_chunk_status(2, ImportChunkStatus::Completed, None) + .unwrap(); + + save_import_state(&path, &state).await.unwrap(); + let loaded = load_import_state(&path).await.unwrap().unwrap(); + + assert_eq!(loaded.snapshot_id, state.snapshot_id); + assert_eq!(loaded.target_addr, state.target_addr); + assert_eq!(loaded.chunks, state.chunks); + } + + #[tokio::test] + async fn test_save_import_state_overwrites_existing_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]); + save_import_state(&path, &state).await.unwrap(); + + state + .set_chunk_status(1, ImportChunkStatus::Completed, None) + .unwrap(); + save_import_state(&path, &state).await.unwrap(); + + let loaded = load_import_state(&path).await.unwrap().unwrap(); + assert_eq!(loaded.chunk_status(1), Some(ImportChunkStatus::Completed)); + } + + #[test] + fn test_load_import_state_resets_in_progress_to_pending() { + let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]); + state + .set_chunk_status( + 2, + ImportChunkStatus::InProgress, + Some("running".to_string()), + ) + .unwrap(); + + normalize_import_state_for_resume(&mut state); + + assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Pending)); + assert_eq!(state.chunk_status(2), Some(ImportChunkStatus::Pending)); + assert_eq!(state.chunks[1].error, None); + } + + #[test] + fn test_unique_tmp_path_generates_distinct_paths() { + let path = Path::new("/tmp/import_state.json"); + + let first = unique_tmp_path(path); + let second = unique_tmp_path(path); + + assert_ne!(first, second); + assert!(first.starts_with("/tmp")); + assert!(second.starts_with("/tmp")); + assert!( + first + .file_name() + .unwrap() + .to_string_lossy() + .ends_with(".tmp") + ); + assert!( + second + .file_name() + .unwrap() + .to_string_lossy() + .ends_with(".tmp") + ); + } + + #[test] + fn test_try_acquire_import_state_lock_rejects_second_holder() { + let dir = tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + + let _first = try_acquire_import_state_lock(&path).unwrap(); + let error = try_acquire_import_state_lock(&path).unwrap_err(); + + assert!(matches!( + error, + crate::data::import_v2::error::Error::ImportStateLocked { .. } + )); + } + + #[tokio::test] + async fn test_delete_import_state_ignores_missing_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join("missing.json"); + + delete_import_state(&path).await.unwrap(); + } + + #[test] + fn test_default_state_path_prefers_home_and_encodes_snapshot_id() { + let home = tempdir().unwrap(); + let cwd = tempdir().unwrap(); + + let path = default_state_path_with( + Some(home.path()), + Some(cwd.path()), + "../snapshot", + "127.0.0.1:4000", + ) + .unwrap(); + + assert_eq!( + path, + home.path() + .join(IMPORT_STATE_ROOT) + .join(IMPORT_STATE_DIR) + .join(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000.json") + ); + } + + #[test] + fn test_default_state_path_falls_back_to_cwd_when_home_missing() { + let cwd = tempdir().unwrap(); + + let path = + default_state_path_with(None, Some(cwd.path()), "snapshot-1", "target-a").unwrap(); + + assert_eq!( + path, + cwd.path().join(".import_state_snapshot-1_target-a.json") + ); + } + + #[test] + fn test_default_state_path_isolated_by_target_addr() { + let cwd = tempdir().unwrap(); + + let first = default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4000") + .unwrap(); + let second = + default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4001") + .unwrap(); + + assert_ne!(first, second); + } + + #[test] + fn test_default_home_dir_prefers_home() { + let detected = default_home_dir_with(|key| match key { + "HOME" => Some(std::ffi::OsString::from("/tmp/home")), + "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")), + _ => None, + }); + + assert_eq!(detected, Some(PathBuf::from("/tmp/home"))); + } + + #[test] + fn test_default_home_dir_falls_back_to_userprofile() { + let detected = default_home_dir_with(|key| match key { + "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")), + _ => None, + }); + + assert_eq!(detected, Some(PathBuf::from("/tmp/userprofile"))); + } + + #[test] + fn test_default_home_dir_falls_back_to_home_drive_and_path() { + let detected = default_home_dir_with(|key| match key { + "HOMEDRIVE" => Some(std::ffi::OsString::from("/tmp")), + "HOMEPATH" => Some(std::ffi::OsString::from("windows-home")), + _ => None, + }); + + assert_eq!(detected, Some(PathBuf::from("/tmp").join("windows-home"))); + } +} diff --git a/src/cli/src/data/retry.rs b/src/cli/src/data/retry.rs new file mode 100644 index 0000000000..f0ac6386d3 --- /dev/null +++ b/src/cli/src/data/retry.rs @@ -0,0 +1,113 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +use std::time::Duration; + +use backon::ExponentialBuilder; + +pub(crate) fn default_retry_policy() -> ExponentialBuilder { + ExponentialBuilder::default() + .with_min_delay(Duration::from_secs(1)) + .with_max_delay(Duration::from_secs(300)) + .with_factor(2.0) + // This is the number of retries after the initial attempt. + .with_max_times(3) + .with_jitter() +} + +#[cfg(test)] +mod tests { + use std::future::ready; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use backon::Retryable; + + use super::*; + + #[tokio::test] + async fn test_retry_policy_retries_retryable_error_until_success() { + let attempts = Arc::new(AtomicUsize::new(0)); + + let result = ({ + let attempts = attempts.clone(); + move || { + let attempts = attempts.clone(); + async move { + let current = attempts.fetch_add(1, Ordering::SeqCst); + if current < 2 { + Err("retryable") + } else { + Ok("done") + } + } + } + }) + .retry(default_retry_policy()) + .when(|error| *error == "retryable") + .sleep(|_| ready(())) + .await; + + assert_eq!(result, Ok("done")); + assert_eq!(attempts.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn test_retry_policy_stops_on_non_retryable_error() { + let attempts = Arc::new(AtomicUsize::new(0)); + + let result: std::result::Result<(), &str> = ({ + let attempts = attempts.clone(); + move || { + let attempts = attempts.clone(); + async move { + attempts.fetch_add(1, Ordering::SeqCst); + Err("fatal") + } + } + }) + .retry(default_retry_policy()) + .when(|error| *error == "retryable") + .sleep(|_| ready(())) + .await; + + assert_eq!(result, Err("fatal")); + assert_eq!(attempts.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_retry_policy_returns_last_error_after_reaching_limit() { + let attempts = Arc::new(AtomicUsize::new(0)); + + let result: std::result::Result<(), usize> = ({ + let attempts = attempts.clone(); + move || { + let attempts = attempts.clone(); + async move { + let current = attempts.fetch_add(1, Ordering::SeqCst); + Err(current) + } + } + }) + .retry(default_retry_policy().with_max_times(2)) + .when(|_| true) + .sleep(|_| ready(())) + .await; + + assert_eq!(result, Err(2)); + assert_eq!(attempts.load(Ordering::SeqCst), 3); + } +}