mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-14 08:00:38 +00:00
Compare commits
2 Commits
problame/p
...
jcsp/remov
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dea475f1ba | ||
|
|
683ec2417c |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -2610,17 +2610,6 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nostarve_queue"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"rand 0.8.5",
|
||||
"scopeguard",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "5.2.0"
|
||||
@@ -2962,7 +2951,6 @@ dependencies = [
|
||||
"itertools",
|
||||
"metrics",
|
||||
"nix 0.26.2",
|
||||
"nostarve_queue",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
|
||||
@@ -27,7 +27,6 @@ members = [
|
||||
"libs/postgres_ffi/wal_craft",
|
||||
"libs/vm_monitor",
|
||||
"libs/walproposer",
|
||||
"libs/nostarve_queue",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -38,7 +37,6 @@ license = "Apache-2.0"
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-channel = "1.9.0"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
azure_core = "0.16"
|
||||
azure_identity = "0.16"
|
||||
@@ -193,7 +191,6 @@ tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
|
||||
utils = { version = "0.1", path = "./libs/utils/" }
|
||||
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
|
||||
walproposer = { version = "0.1", path = "./libs/walproposer/" }
|
||||
nostarve_queue = { path = "./libs/nostarve_queue" }
|
||||
|
||||
## Common library dependency
|
||||
workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
|
||||
@@ -21,7 +21,7 @@ use pageserver_api::models::{
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
|
||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use utils::auth::{Claims, Scope};
|
||||
@@ -99,7 +99,7 @@ impl PageServerNode {
|
||||
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
|
||||
conf: conf.clone(),
|
||||
env: env.clone(),
|
||||
http_client: ClientBuilder::new().timeout(None).build().unwrap(),
|
||||
http_client: Client::new(),
|
||||
http_base_url: format!("http://{}/v1", conf.listen_http_addr),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
[package]
|
||||
name = "nostarve_queue"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
scopeguard.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
futures.workspace = true
|
||||
rand.workspace = true
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time"] }
|
||||
@@ -1,316 +0,0 @@
|
||||
//! Synchronization primitive to prevent starvation among concurrent tasks that do the same work.
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
fmt,
|
||||
future::poll_fn,
|
||||
sync::Mutex,
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
|
||||
pub struct Queue<T> {
|
||||
inner: Mutex<Inner<T>>,
|
||||
}
|
||||
|
||||
struct Inner<T> {
|
||||
waiters: VecDeque<usize>,
|
||||
free: VecDeque<usize>,
|
||||
slots: Vec<Option<(Option<Waker>, Option<T>)>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Position<'q, T> {
|
||||
idx: usize,
|
||||
queue: &'q Queue<T>,
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for Position<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Position").field("idx", &self.idx).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
#[cfg(not(test))]
|
||||
#[inline]
|
||||
fn integrity_check(&self) {}
|
||||
|
||||
#[cfg(test)]
|
||||
fn integrity_check(&self) {
|
||||
use std::collections::HashSet;
|
||||
let waiters = self.waiters.iter().copied().collect::<HashSet<_>>();
|
||||
let free = self.free.iter().copied().collect::<HashSet<_>>();
|
||||
for (slot_idx, slot) in self.slots.iter().enumerate() {
|
||||
match slot {
|
||||
None => {
|
||||
assert!(!waiters.contains(&slot_idx));
|
||||
assert!(free.contains(&slot_idx));
|
||||
}
|
||||
Some((None, None)) => {
|
||||
assert!(waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
Some((Some(_), Some(_))) => {
|
||||
assert!(!waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
Some((Some(_), None)) => {
|
||||
assert!(waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
Some((None, Some(_))) => {
|
||||
assert!(!waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Queue<T> {
|
||||
pub fn new(size: usize) -> Self {
|
||||
Queue {
|
||||
inner: Mutex::new(Inner {
|
||||
waiters: VecDeque::new(),
|
||||
free: (0..size).collect(),
|
||||
slots: {
|
||||
let mut v = Vec::with_capacity(size);
|
||||
v.resize_with(size, || None);
|
||||
v
|
||||
},
|
||||
}),
|
||||
}
|
||||
}
|
||||
pub fn begin(&self) -> Result<Position<T>, ()> {
|
||||
#[cfg(test)]
|
||||
tracing::trace!("get in line locking inner");
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.integrity_check();
|
||||
let my_waitslot_idx = inner
|
||||
.free
|
||||
.pop_front()
|
||||
.expect("can't happen, len(slots) = len(waiters");
|
||||
inner.waiters.push_back(my_waitslot_idx);
|
||||
let prev = inner.slots[my_waitslot_idx].replace((None, None));
|
||||
assert!(prev.is_none());
|
||||
inner.integrity_check();
|
||||
Ok(Position {
|
||||
idx: my_waitslot_idx,
|
||||
queue: &self,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'q, T> Position<'q, T> {
|
||||
pub fn complete_and_wait(self, datum: T) -> impl std::future::Future<Output = T> + 'q {
|
||||
#[cfg(test)]
|
||||
tracing::trace!("found victim locking waiters");
|
||||
let mut inner = self.queue.inner.lock().unwrap();
|
||||
inner.integrity_check();
|
||||
let winner_idx = inner.waiters.pop_front().expect("we put ourselves in");
|
||||
#[cfg(test)]
|
||||
tracing::trace!(winner_idx, "putting victim into next waiters slot");
|
||||
let winner_slot = inner.slots[winner_idx].as_mut().unwrap();
|
||||
let prev = winner_slot.1.replace(datum);
|
||||
assert!(
|
||||
prev.is_none(),
|
||||
"ensure we didn't mess up this simple ring buffer structure"
|
||||
);
|
||||
if let Some(waker) = winner_slot.0.take() {
|
||||
#[cfg(test)]
|
||||
tracing::trace!(winner_idx, "waking up winner");
|
||||
waker.wake()
|
||||
}
|
||||
inner.integrity_check();
|
||||
drop(inner); // the poll_fn locks it again
|
||||
|
||||
let mut poll_num = 0;
|
||||
let mut drop_guard = Some(scopeguard::guard((), |()| {
|
||||
panic!("must not drop this future until Ready");
|
||||
}));
|
||||
|
||||
// take the victim that was found by someone else
|
||||
poll_fn(move |cx| {
|
||||
let my_waitslot_idx = self.idx;
|
||||
poll_num += 1;
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, "poll_fn locking waiters");
|
||||
let mut inner = self.queue.inner.lock().unwrap();
|
||||
inner.integrity_check();
|
||||
let my_waitslot = inner.slots[self.idx].as_mut().unwrap();
|
||||
// assert!(
|
||||
// poll_num <= 2,
|
||||
// "once we place the waker in the slot, next wakeup should have a result: {}",
|
||||
// my_waitslot.1.is_some()
|
||||
// );
|
||||
if let Some(res) = my_waitslot.1.take() {
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, "have cache slot");
|
||||
// above .take() resets the waiters slot to None
|
||||
debug_assert!(my_waitslot.0.is_none());
|
||||
debug_assert!(my_waitslot.1.is_none());
|
||||
inner.slots[my_waitslot_idx] = None;
|
||||
inner.free.push_back(my_waitslot_idx);
|
||||
let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap());
|
||||
inner.integrity_check();
|
||||
return Poll::Ready(res);
|
||||
}
|
||||
// assert_eq!(poll_num, 1);
|
||||
if !my_waitslot
|
||||
.0
|
||||
.as_ref()
|
||||
.map(|existing| cx.waker().will_wake(existing))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let prev = my_waitslot.0.replace(cx.waker().clone());
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, prev_is_some = prev.is_some(), "updating waker");
|
||||
}
|
||||
inner.integrity_check();
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, "waiting to be woken up");
|
||||
Poll::Pending
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use rand::RngCore;
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_order_completion_and_wait() {
|
||||
let queue = super::Queue::new(2);
|
||||
|
||||
let q1 = queue.begin().unwrap();
|
||||
let q2 = queue.begin().unwrap();
|
||||
|
||||
assert_eq!(q1.complete_and_wait(23).await, 23);
|
||||
assert_eq!(q2.complete_and_wait(42).await, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn out_of_order_completion_and_wait() {
|
||||
let queue = super::Queue::new(2);
|
||||
|
||||
let q1 = queue.begin().unwrap();
|
||||
let q2 = queue.begin().unwrap();
|
||||
|
||||
let mut q2compfut = q2.complete_and_wait(23);
|
||||
|
||||
match futures::poll!(&mut q2compfut) {
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(_) => panic!("should not be ready yet, it's queued after q1"),
|
||||
}
|
||||
|
||||
let q1res = q1.complete_and_wait(42).await;
|
||||
assert_eq!(q1res, 23);
|
||||
|
||||
let q2res = q2compfut.await;
|
||||
assert_eq!(q2res, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_order_completion_out_of_order_wait() {
|
||||
let queue = super::Queue::new(2);
|
||||
|
||||
let q1 = queue.begin().unwrap();
|
||||
let q2 = queue.begin().unwrap();
|
||||
|
||||
let mut q1compfut = q1.complete_and_wait(23);
|
||||
|
||||
let mut q2compfut = q2.complete_and_wait(42);
|
||||
|
||||
match futures::poll!(&mut q2compfut) {
|
||||
Poll::Pending => {
|
||||
unreachable!("q2 should be ready, it wasn't first but q1 is serviced already")
|
||||
}
|
||||
Poll::Ready(x) => assert_eq!(x, 42),
|
||||
}
|
||||
|
||||
assert_eq!(futures::poll!(&mut q1compfut), Poll::Ready(23));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn stress() {
|
||||
let ntasks = 8;
|
||||
let queue_size = 8;
|
||||
let queue = Arc::new(super::Queue::new(queue_size));
|
||||
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let mut tasks = vec![];
|
||||
for i in 0..ntasks {
|
||||
let jh = tokio::spawn({
|
||||
let queue = Arc::clone(&queue);
|
||||
let stop = Arc::clone(&stop);
|
||||
async move {
|
||||
while !stop.load(Ordering::Relaxed) {
|
||||
let q = queue.begin().unwrap();
|
||||
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
q.complete_and_wait(i).await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
});
|
||||
tasks.push(jh);
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
stop.store(true, Ordering::Relaxed);
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_two_runtimes_shared_queue() {
|
||||
std::thread::scope(|s| {
|
||||
let ntasks = 8;
|
||||
let queue_size = 8;
|
||||
let queue = Arc::new(super::Queue::new(queue_size));
|
||||
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
|
||||
for i in 0..ntasks {
|
||||
s.spawn({
|
||||
let queue = Arc::clone(&queue);
|
||||
let stop = Arc::clone(&stop);
|
||||
move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(async move {
|
||||
while !stop.load(Ordering::Relaxed) {
|
||||
let q = queue.begin().unwrap();
|
||||
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
q.complete_and_wait(i).await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::thread::sleep(Duration::from_secs(10));
|
||||
|
||||
stop.store(true, Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -37,7 +37,6 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
nix.workspace = true
|
||||
nostarve_queue.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
num_cpus = { version = "1.15" }
|
||||
num-traits.workspace = true
|
||||
|
||||
@@ -280,7 +280,6 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
|
||||
use crate::tenant::delete::DeleteTenantError::*;
|
||||
match value {
|
||||
Get(g) => ApiError::from(g),
|
||||
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
|
||||
Timeline(t) => ApiError::from(t),
|
||||
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
|
||||
SlotError(e) => e.into(),
|
||||
|
||||
@@ -314,6 +314,7 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||
|
||||
@@ -83,7 +83,6 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tracing::instrument;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -253,9 +252,6 @@ pub struct PageCache {
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
|
||||
find_victim_waiters:
|
||||
nostarve_queue::Queue<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
@@ -434,9 +430,8 @@ impl PageCache {
|
||||
///
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(%key, %lsn)))]
|
||||
pub async fn memorize_materialized_page(
|
||||
&'static self,
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
@@ -527,9 +522,8 @@ impl PageCache {
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(?file_id, ?blkno)))]
|
||||
pub async fn read_immutable_buf(
|
||||
&'static self,
|
||||
&self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
ctx: &RequestContext,
|
||||
@@ -635,7 +629,7 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
async fn lock_for_read(
|
||||
&'static self,
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
@@ -857,15 +851,10 @@ impl PageCache {
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&'static self,
|
||||
&self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let nostarve_position = self.find_victim_waiters.begin()
|
||||
.expect("we initialize the nostarve queue to the same size as the slots semaphore, and the caller is presenting a permit");
|
||||
|
||||
let span = tracing::info_span!("find_victim", ?nostarve_position);
|
||||
let _enter = span.enter();
|
||||
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
@@ -877,8 +866,41 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
|
||||
unreachable!("find_victim_waiters prevents starvation");
|
||||
if iters > iter_limit {
|
||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
||||
// any particular number of iterations: other threads might race ahead and acquire and
|
||||
// release pins just as we're scanning the array.
|
||||
//
|
||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
||||
// slots. There are two threads running concurrently, A and B. A has just
|
||||
// acquired the permit from the semaphore.
|
||||
//
|
||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
//
|
||||
// Now we're back in the starting situation that both slots have
|
||||
// usage_count 1, but A has now been through one iteration of the
|
||||
// find_victim() loop. This can repeat indefinitely and on each
|
||||
// iteration, A's iteration count increases by one.
|
||||
//
|
||||
// So, even though the semaphore for the permits is fair, the victim search
|
||||
// itself happens in parallel and is not fair.
|
||||
// Hence even with a permit, a task can theoretically be starved.
|
||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
||||
// permits for longer.
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
||||
);
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -889,8 +911,7 @@ impl PageCache {
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
|
||||
return Ok(nostarve_position.complete_and_wait((slot_idx, inner)).await);
|
||||
return Ok((slot_idx, inner));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -934,7 +955,6 @@ impl PageCache {
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
find_victim_waiters: ::nostarve_queue::Queue::new(num_pages),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,8 +257,6 @@ pub struct Tenant {
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
|
||||
// Cancellation token fires when we have entered shutdown(). This is a parent of
|
||||
// Timelines' cancellation token.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
@@ -634,9 +632,9 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
info!("pending_deletion {}", pending_deletion.is_some());
|
||||
info!("pending_deletion {}", pending_deletion);
|
||||
|
||||
if let Some(deletion) = pending_deletion {
|
||||
if pending_deletion {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
@@ -653,7 +651,6 @@ impl Tenant {
|
||||
}
|
||||
|
||||
match DeleteTenantFlow::resume_from_attach(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
@@ -2372,7 +2369,6 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::new(format!("Tenant<{tenant_id}>")),
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, instrument, warn, Instrument, Span};
|
||||
|
||||
@@ -39,9 +38,6 @@ pub(crate) enum DeleteTenantError {
|
||||
#[error("Invalid state {0}. Expected Active or Broken")]
|
||||
InvalidState(TenantState),
|
||||
|
||||
#[error("Tenant deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error("Tenant map slot error {0}")]
|
||||
SlotError(#[from] TenantSlotError),
|
||||
|
||||
@@ -55,8 +51,6 @@ pub(crate) enum DeleteTenantError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
|
||||
|
||||
fn remote_tenant_delete_mark_path(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
@@ -287,14 +281,14 @@ impl DeleteTenantFlow {
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut guard = Self::prepare(&tenant).await?;
|
||||
Self::prepare(&tenant).await?;
|
||||
|
||||
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
|
||||
if let Err(e) = Self::run_inner(conf, remote_storage.as_ref(), &tenant).await {
|
||||
tenant.set_broken(format!("{e:#}")).await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Self::schedule_background(guard, conf, remote_storage, tenants, tenant);
|
||||
Self::schedule_background(conf, remote_storage, tenants, tenant);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -304,13 +298,10 @@ impl DeleteTenantFlow {
|
||||
// will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried.
|
||||
// So the solution is to set tenant state to broken.
|
||||
async fn run_inner(
|
||||
guard: &mut OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-remote-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-create-remote-mark"
|
||||
@@ -345,46 +336,25 @@ impl DeleteTenantFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_in_progress(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::Finished => anyhow::bail!("Bug. Is in finished state"),
|
||||
Self::InProgress { .. } => { /* We're in a retry */ }
|
||||
Self::NotStarted => { /* Fresh start */ }
|
||||
}
|
||||
|
||||
*self = Self::InProgress;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_mark_exists: bool,
|
||||
tenant: &Tenant,
|
||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||
let acquire = |t: &Tenant| {
|
||||
Some(
|
||||
Arc::clone(&t.delete_progress)
|
||||
.try_lock_owned()
|
||||
.expect("we're the only owner during init"),
|
||||
)
|
||||
};
|
||||
|
||||
) -> Result<bool, DeleteTenantError> {
|
||||
if remote_mark_exists {
|
||||
return Ok(acquire(tenant));
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||
Ok(acquire(tenant))
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(None)
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_from_attach(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
preload: Option<TenantPreload>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -403,19 +373,10 @@ impl DeleteTenantFlow {
|
||||
.await
|
||||
.context("attach")?;
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
tenant.conf,
|
||||
tenant.remote_storage.clone(),
|
||||
tenants,
|
||||
tenant,
|
||||
)
|
||||
.await
|
||||
Self::background(tenant.conf, tenant.remote_storage.clone(), tenants, tenant).await
|
||||
}
|
||||
|
||||
async fn prepare(
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<tokio::sync::OwnedMutexGuard<Self>, DeleteTenantError> {
|
||||
async fn prepare(tenant: &Arc<Tenant>) -> Result<(), DeleteTenantError> {
|
||||
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
|
||||
// so at least for now allow deletions only for active tenants. TODO recheck
|
||||
// Broken and Stopping is needed for retries.
|
||||
@@ -426,10 +387,6 @@ impl DeleteTenantFlow {
|
||||
return Err(DeleteTenantError::InvalidState(tenant.current_state()));
|
||||
}
|
||||
|
||||
let guard = Arc::clone(&tenant.delete_progress)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTenantError::AlreadyInProgress)?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-shutdown", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
|
||||
});
|
||||
@@ -449,11 +406,10 @@ impl DeleteTenantFlow {
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(guard)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schedule_background(
|
||||
guard: OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -469,9 +425,7 @@ impl DeleteTenantFlow {
|
||||
"tenant_delete",
|
||||
false,
|
||||
async move {
|
||||
if let Err(err) =
|
||||
Self::background(guard, conf, remote_storage, tenants, &tenant).await
|
||||
{
|
||||
if let Err(err) = Self::background(conf, remote_storage, tenants, &tenant).await {
|
||||
error!("Error: {err:#}");
|
||||
tenant.set_broken(format!("{err:#}")).await;
|
||||
};
|
||||
@@ -486,7 +440,6 @@ impl DeleteTenantFlow {
|
||||
}
|
||||
|
||||
async fn background(
|
||||
mut guard: OwnedMutexGuard<Self>,
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -550,8 +503,6 @@ impl DeleteTenantFlow {
|
||||
.set(locked.len() as u64);
|
||||
}
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -336,10 +336,15 @@ def test_live_reconfig_get_evictions_low_residence_duration_metric_threshold(
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# disable compaction so that it will not download the layer for repartitioning
|
||||
"compaction_period": "0s"
|
||||
}
|
||||
)
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
|
||||
(tenant_id, timeline_id) = env.initial_tenant, env.initial_timeline
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
def get_metric():
|
||||
|
||||
Reference in New Issue
Block a user