mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
page_cache: find_victim: prevent starvation
This commit is contained in:
committed by
Christian Schwarz
parent
dc96a7604a
commit
9f03dd24c2
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -158,6 +158,17 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.0"
|
||||
@@ -1031,6 +1042,15 @@ dependencies = [
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const_format"
|
||||
version = "0.2.30"
|
||||
@@ -1452,6 +1472,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "fail"
|
||||
version = "0.5.1"
|
||||
@@ -2674,6 +2700,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
|
||||
@@ -35,6 +35,7 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
async-channel = "1.9.0"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-channel.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||
|
||||
@@ -78,6 +78,7 @@ use std::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -251,6 +252,11 @@ pub struct PageCache {
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
find_victim_sender:
|
||||
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
find_victim_waiters:
|
||||
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
@@ -440,7 +446,7 @@ impl PageCache {
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
&'static self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
@@ -532,7 +538,7 @@ impl PageCache {
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
&'static self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
ctx: &RequestContext,
|
||||
@@ -638,7 +644,7 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
async fn lock_for_read(
|
||||
&self,
|
||||
&'static self,
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
@@ -860,10 +866,12 @@ impl PageCache {
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&self,
|
||||
&'static self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
// Get in line.
|
||||
let receiver = self.find_victim_waiters.recv();
|
||||
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
@@ -875,41 +883,8 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
if iters > iter_limit {
|
||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
||||
// any particular number of iterations: other threads might race ahead and acquire and
|
||||
// release pins just as we're scanning the array.
|
||||
//
|
||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
||||
// slots. There are two threads running concurrently, A and B. A has just
|
||||
// acquired the permit from the semaphore.
|
||||
//
|
||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
//
|
||||
// Now we're back in the starting situation that both slots have
|
||||
// usage_count 1, but A has now been through one iteration of the
|
||||
// find_victim() loop. This can repeat indefinitely and on each
|
||||
// iteration, A's iteration count increases by one.
|
||||
//
|
||||
// So, even though the semaphore for the permits is fair, the victim search
|
||||
// itself happens in parallel and is not fair.
|
||||
// Hence even with a permit, a task can theoretically be starved.
|
||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
||||
// permits for longer.
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
||||
);
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
|
||||
unreachable!("find_victim_waiters prevents starvation");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -920,7 +895,16 @@ impl PageCache {
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
return Ok((slot_idx, inner));
|
||||
self.find_victim_sender
|
||||
.try_send((slot_idx, inner))
|
||||
.expect("we always get in line first");
|
||||
match futures::poll!(receiver) {
|
||||
Poll::Ready(Ok(res)) => return Ok(res),
|
||||
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
|
||||
Poll::Pending => {
|
||||
unreachable!("we just sent to the channel and got in line earlier")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -957,6 +941,7 @@ impl PageCache {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
@@ -964,6 +949,8 @@ impl PageCache {
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
find_victim_sender,
|
||||
find_victim_waiters,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user