This commit is contained in:
Christian Schwarz
2024-02-05 11:19:19 +00:00
parent cb4908dedb
commit e603337f39
5 changed files with 42 additions and 27 deletions

View File

@@ -1,20 +1,17 @@
use std::{
collections::VecDeque,
num::NonZeroUsize,
sync::{Arc, RwLock},
thread::JoinHandle,
};
use std::sync::Arc;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument};
use crate::backoff;
pub struct Client<T> {
cmds_tx: mpsc::UnboundedSender<Command>,
items_rx: async_channel::Receiver<CreatedItem<T>>,
}
pub trait Launcher<T> {
fn what() -> &'static str;
fn create(&self) -> anyhow::Result<T>;
}
@@ -42,7 +39,9 @@ impl<T> Client<T> {
}
pub fn set_slot_count_nowait(&self, count: usize) {
self.cmds_tx.send(Command::SetSlotCount(count));
self.cmds_tx
.send(Command::SetSlotCount(count))
.expect("while cmds_tx is open, the pool task doesn't exit");
}
}
@@ -86,15 +85,47 @@ where
async fn task(mut self) {
let initial = 0;
let mut configured = initial;
let mut pending_items = Arc::new(tokio::sync::Semaphore::new(initial));
let pending_items = Arc::new(tokio::sync::Semaphore::new(initial));
let mut need_forget = 0;
let mut last_launcher_failure_at = None;
loop {
debug!(
configured,
need_forget,
available = pending_items.available_permits(),
last_launcher_failure_secs_ago =
last_launcher_failure_at.map(|at| at.elapsed().as_secs_f64()),
"iteration"
);
let try_launch_once = || async {
let permit = Arc::clone(&pending_items)
.acquire_owned()
.expect("we never close this semaphore");
if need_forget > 0 {
debug!("fogetting permit to reduce semaphore count");
need_forget -= 1;
permit.forget();
continue;
}
debug!("creating item");
let item = match self.launcher.create() {
Ok(item) => item,
Err(e) => {
error!(
"launcher failed to create item: {}",
report_compact_sources(&e)
);
}
};
};
let try_launch_retrying = backoff::retry(
try_launch_once,
|_| false,
0,
u32::MAX,
L::what(),
CancellationToken::new(),
);
let cmd = tokio::select! {
res = self.cmds_rx.recv() => {
match res {
@@ -102,19 +133,7 @@ where
None => return, // dropping tx acts as cancellation
}
}
permit = Arc::clone(&pending_items).acquire_owned() => {
let permit = permit.expect("we never close this semaphore");
if need_forget > 0 {
debug!("fogetting permit to reduce semaphore count");
need_forget -= 1;
permit.forget();
continue;
}
debug!("creating item");
let item = match self.launcher.create() {
Ok(item) => item,
Err(e) => todo!(),
};
item = try_launch => {
match self.items_tx.send(CreatedItem { permit, item }).await {
Ok(()) => continue,
Err(_) => {

View File

@@ -21,7 +21,6 @@ use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use remote_storage::GenericRemoteStorage;
use utils::crashsafe;
use crate::config::PageServerConf;

View File

@@ -7,7 +7,7 @@ use crate::{
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use pageserver_api::reltag::RelTag;
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]

View File

@@ -15,7 +15,6 @@ use std::ops::Deref;
use std::process::Child;
use pageserver_api::shard::TenantShardId;
/// Wrapper type around `std::process::Child` which guarantees that the child
/// will be killed and waited-for by this process before being dropped.

View File

@@ -1,5 +1,3 @@
use std::sync::Arc;
use anyhow::Context;
use utils::pre_spawned_pool;