diff --git a/libs/utils/src/pre_spawned_pool.rs b/libs/utils/src/pre_spawned_pool.rs index 2fbe4e1a0a..b92867ed0a 100644 --- a/libs/utils/src/pre_spawned_pool.rs +++ b/libs/utils/src/pre_spawned_pool.rs @@ -1,20 +1,17 @@ -use std::{ - collections::VecDeque, - num::NonZeroUsize, - sync::{Arc, RwLock}, - thread::JoinHandle, -}; +use std::sync::Arc; use tokio::sync::{mpsc, OwnedSemaphorePermit}; -use tokio_util::sync::CancellationToken; use tracing::{debug, instrument}; +use crate::backoff; + pub struct Client { cmds_tx: mpsc::UnboundedSender, items_rx: async_channel::Receiver>, } pub trait Launcher { + fn what() -> &'static str; fn create(&self) -> anyhow::Result; } @@ -42,7 +39,9 @@ impl Client { } pub fn set_slot_count_nowait(&self, count: usize) { - self.cmds_tx.send(Command::SetSlotCount(count)); + self.cmds_tx + .send(Command::SetSlotCount(count)) + .expect("while cmds_tx is open, the pool task doesn't exit"); } } @@ -86,15 +85,47 @@ where async fn task(mut self) { let initial = 0; let mut configured = initial; - let mut pending_items = Arc::new(tokio::sync::Semaphore::new(initial)); + let pending_items = Arc::new(tokio::sync::Semaphore::new(initial)); let mut need_forget = 0; + let mut last_launcher_failure_at = None; loop { debug!( configured, need_forget, available = pending_items.available_permits(), + last_launcher_failure_secs_ago = + last_launcher_failure_at.map(|at| at.elapsed().as_secs_f64()), "iteration" ); + let try_launch_once = || async { + let permit = Arc::clone(&pending_items) + .acquire_owned() + .expect("we never close this semaphore"); + if need_forget > 0 { + debug!("fogetting permit to reduce semaphore count"); + need_forget -= 1; + permit.forget(); + continue; + } + debug!("creating item"); + let item = match self.launcher.create() { + Ok(item) => item, + Err(e) => { + error!( + "launcher failed to create item: {}", + report_compact_sources(&e) + ); + } + }; + }; + let try_launch_retrying = backoff::retry( + try_launch_once, + |_| false, + 0, + u32::MAX, + L::what(), + CancellationToken::new(), + ); let cmd = tokio::select! { res = self.cmds_rx.recv() => { match res { @@ -102,19 +133,7 @@ where None => return, // dropping tx acts as cancellation } } - permit = Arc::clone(&pending_items).acquire_owned() => { - let permit = permit.expect("we never close this semaphore"); - if need_forget > 0 { - debug!("fogetting permit to reduce semaphore count"); - need_forget -= 1; - permit.forget(); - continue; - } - debug!("creating item"); - let item = match self.launcher.create() { - Ok(item) => item, - Err(e) => todo!(), - }; + item = try_launch => { match self.items_tx.send(CreatedItem { permit, item }).await { Ok(()) => continue, Err(_) => { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 1756cbb35d..1307c54029 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -21,7 +21,6 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; -use remote_storage::GenericRemoteStorage; use utils::crashsafe; use crate::config::PageServerConf; diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index c0b6380374..eaa759378c 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -7,7 +7,7 @@ use crate::{ use anyhow::Context; use bytes::Bytes; use nix::poll::{PollFd, PollFlags}; -use pageserver_api::{reltag::RelTag, shard::TenantShardId}; +use pageserver_api::reltag::RelTag; use postgres_ffi::BLCKSZ; use std::os::fd::AsRawFd; #[cfg(feature = "testing")] diff --git a/pageserver/src/walredo/process/no_leak_child.rs b/pageserver/src/walredo/process/no_leak_child.rs index 5973d98c84..d7b6a89ee9 100644 --- a/pageserver/src/walredo/process/no_leak_child.rs +++ b/pageserver/src/walredo/process/no_leak_child.rs @@ -15,7 +15,6 @@ use std::ops::Deref; use std::process::Child; -use pageserver_api::shard::TenantShardId; /// Wrapper type around `std::process::Child` which guarantees that the child /// will be killed and waited-for by this process before being dropped. diff --git a/pageserver/src/walredo/process_pool.rs b/pageserver/src/walredo/process_pool.rs index 2bb6232aa3..b310eaf5d5 100644 --- a/pageserver/src/walredo/process_pool.rs +++ b/pageserver/src/walredo/process_pool.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use anyhow::Context; use utils::pre_spawned_pool;