Compare commits

...

7 Commits

Author SHA1 Message Date
Christian Schwarz
1863a04fb0 WIP 2023-10-05 18:21:18 +02:00
Christian Schwarz
f83a71ca6a WIP 2023-10-05 18:13:54 +02:00
Christian Schwarz
74a634c9fa WIP 2023-10-05 18:06:26 +02:00
Christian Schwarz
fc3f8a65b3 WIP: provide permits in requestcontext 2023-10-05 18:02:22 +02:00
Christian Schwarz
9f03dd24c2 page_cache: find_victim: prevent starvation 2023-10-05 16:54:02 +02:00
Christian Schwarz
dc96a7604a page_cache: ensure forward progress on cache miss 2023-10-05 16:51:08 +02:00
Christian Schwarz
d7c94e67ce inline lock_for_write and try_lock_for_write into memorize_materialized_page
Motivation
==========

It's the only user, and the name of `_for_write` is wrong as of

    commit 7a63685cde
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Fri Aug 18 19:31:03 2023 +0200

        simplify page-caching of EphemeralFile (#4994)

Notes
=====

This also allows us to get rid of the WriteBufResult type.

Also rename `search_mapping_for_write` to `search_mapping_exact`.
It makes more sense that way because there is `_for_write`-locking
anymore.
2023-10-05 16:01:29 +02:00
13 changed files with 335 additions and 300 deletions

27
Cargo.lock generated
View File

@@ -158,6 +158,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.4.0"
@@ -1031,6 +1042,15 @@ dependencies = [
"zstd",
]
[[package]]
name = "concurrent-queue"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "const_format"
version = "0.2.30"
@@ -1452,6 +1472,12 @@ dependencies = [
"libc",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fail"
version = "0.5.1"
@@ -2674,6 +2700,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-compression",
"async-stream",
"async-trait",

View File

@@ -35,6 +35,7 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-channel = "1.9.0"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
flate2 = "1.0.26"
async-stream = "0.3"

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
async-channel.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true

View File

@@ -86,15 +86,18 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::sync::{Arc, Mutex, MutexGuard};
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
page_cache_permit: Option<Arc<crate::page_cache::PinnedSlotsPermit>>,
}
/// The kind of access to the page cache.
@@ -150,6 +153,7 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
page_cache_permit: None,
},
}
}
@@ -163,6 +167,7 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
page_cache_permit: original.page_cache_permit.clone(),
},
}
}
@@ -186,6 +191,11 @@ impl RequestContextBuilder {
self
}
pub(crate) fn page_cache_permit(mut self, p: Arc<crate::page_cache::PinnedSlotsPermit>) -> Self {
self.inner.page_cache_permit = Some(p);
self
}
pub fn build(self) -> RequestContext {
self.inner
}
@@ -286,4 +296,8 @@ impl RequestContext {
pub(crate) fn page_content_kind(&self) -> PageContentKind {
self.page_content_kind
}
pub(crate) fn permit(&self) -> Option<&crate::page_cache::PinnedSlotsPermit> {
self.page_cache_permit.as_ref().map(|p| &**p)
}
}

View File

@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
AcquirePinnedSlotTimeout,
EvictIterLimit,
}
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {

View File

@@ -66,8 +66,7 @@
//! inserted to the mapping, but you must hold the write-lock on the slot until
//! the contents are valid. If you need to release the lock without initializing
//! the contents, you must remove the mapping first. We make that easy for the
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
//! page, the caller must explicitly call guard.mark_valid() after it has
//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
//! initialized it. If the guard is dropped without calling mark_valid(), the
//! mapping is automatically removed and the slot is marked free.
//!
@@ -79,6 +78,7 @@ use std::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
},
task::Poll,
time::Duration,
};
@@ -215,16 +215,21 @@ impl Slot {
impl SlotInner {
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
let mut guard = self.permit.lock().unwrap();
if let Some(existing_permit) = guard.upgrade() {
drop(guard);
drop(permit);
existing_permit
} else {
let permit = Arc::new(permit);
*guard = Arc::downgrade(&permit);
permit
fn coalesce_readers_permit<'c>(&self, permit: PermitKind<'c>) -> PermitKindReadGuard<'c> {
match permit {
PermitKind::CtxProvided(permit) => PermitKindReadGuard::CtxProvided(permit),
PermitKind::Acquired(permit) => {
let mut guard = self.permit.lock().unwrap();
if let Some(existing_permit) = guard.upgrade() {
drop(guard);
drop(permit);
existing_permit
} else {
let permit = Arc::new(permit);
*guard = Arc::downgrade(&permit);
permit
}
}
}
}
}
@@ -252,21 +257,36 @@ pub struct PageCache {
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
find_victim_sender:
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
find_victim_waiters:
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
size_metrics: &'static PageCacheSizeMetrics,
}
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
pub(crate) struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
enum PermitKind<'c> {
CtxProvided(&'c PinnedSlotsPermit),
Acquired(PinnedSlotsPermit),
}
enum PermitKindReadGuard<'c> {
CtxProvided(&'c PinnedSlotsPermit),
Coalesced(Arc<PinnedSlotsPermit>),
}
///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i> {
_permit: Arc<PinnedSlotsPermit>,
pub struct PageReadGuard<'c, 'i> {
_permit: PermitKindReadGuard<'c>,
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
}
impl std::ops::Deref for PageReadGuard<'_> {
impl std::ops::Deref for PageReadGuard<'_, '_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
@@ -274,7 +294,7 @@ impl std::ops::Deref for PageReadGuard<'_> {
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_, '_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.slot_guard.buf
}
@@ -286,78 +306,89 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
///
/// Counterintuitively, this is used even for a read, if the requested page is not
/// currently found in the page cache. In that case, the caller of lock_for_read()
/// is expected to fill in the page contents and call mark_valid(). Similarly
/// lock_for_write() can return an invalid buffer that the caller is expected to
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
/// is expected to fill in the page contents and call mark_valid().
pub struct PageWriteGuard<'c, 'i> {
state: PageWriteGuardState<'c, 'i>,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
enum PageWriteGuardState<'c, 'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PermitKindReadGuard<'c>,
},
Downgraded,
}
impl std::ops::DerefMut for PageWriteGuard<'_, '_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl std::ops::Deref for PageWriteGuard<'_> {
impl std::ops::Deref for PageWriteGuard<'_, '_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_, '_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => todo!(),
}
}
}
impl PageWriteGuard<'_> {
impl<'c, 'a> PageWriteGuard<'c, 'a> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
#[must_use]
pub fn mark_valid(mut self) -> PageReadGuard<'c, 'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit,
slot_guard: inner.downgrade(),
}
}
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl Drop for PageWriteGuard<'_> {
impl Drop for PageWriteGuard<'_, '_> {
///
/// If the buffer was allocated for a page that was not already in the
/// cache, but the lock_for_read/write() caller dropped the buffer without
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
inner.key = None;
}
PageWriteGuardState::Downgraded => {}
}
}
}
/// lock_for_read() return value
pub enum ReadBufResult<'a> {
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
pub enum ReadBufResult<'c, 'a> {
Found(PageReadGuard<'c, 'a>),
NotFound(PageWriteGuard<'c, 'a>),
}
impl PageCache {
@@ -379,10 +410,9 @@ impl PageCache {
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
let Ok(permit) = self.try_get_pinned_slot_permit(ctx).await else {
return None;
};
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_accesses_materialized_page
@@ -430,12 +460,13 @@ impl PageCache {
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
&'static self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
lsn: Lsn,
img: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
@@ -446,30 +477,87 @@ impl PageCache {
lsn,
};
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(&cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
debug_assert_eq!(inner.buf.len(), img.len());
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(inner.buf == img);
return Ok(());
}
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
}
}
debug_assert!(permit.is_some());
Ok(())
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
// Create a write guard for the slot so we go through the expected motions.
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
let mut write_guard = PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
};
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
return Ok(());
}
}
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
pub async fn read_immutable_buf<'c>(
&'static self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
ctx: &'c RequestContext,
) -> anyhow::Result<ReadBufResult<'c, 'static>> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key, ctx).await
@@ -483,7 +571,22 @@ impl PageCache {
// "mappings" after this section. But the routines in this section should
// not require changes.
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
pub(crate) async fn get_permit(&self) -> Arc<PinnedSlotsPermit> {
Arc::new(PinnedSlotsPermit(
Arc::clone(&self.pinned_slots)
.acquire_owned()
.await
.expect("the semaphore is never closed"),
))
}
async fn try_get_pinned_slot_permit<'c>(
&self,
ctx: &'c RequestContext,
) -> anyhow::Result<PermitKind<'c>> {
if let Some(permit) = ctx.permit() {
return Ok(PermitKind::CtxProvided(permit));
};
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
@@ -493,9 +596,9 @@ impl PageCache {
)
.await
{
Ok(res) => Ok(PinnedSlotsPermit(
Ok(res) => Ok(PermitKind::Acquired(PinnedSlotsPermit(
res.expect("this semaphore is never closed"),
)),
))),
Err(_timeout) => {
timer.stop_and_discard();
crate::metrics::page_cache_errors_inc(
@@ -515,10 +618,10 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(
async fn try_lock_for_read<'c>(
&self,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
permit: &mut Option<PermitKind<'c>>,
) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
@@ -571,11 +674,11 @@ impl PageCache {
/// ```
///
async fn lock_for_read(
&self,
&'static self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
@@ -638,99 +741,10 @@ impl PageCache {
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Some(PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
valid: true,
});
}
}
None
}
/// Return a write-locked buffer for given block.
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
return Ok(WriteBufResult::Found(write_guard));
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
}));
}
}
@@ -775,7 +789,7 @@ impl PageCache {
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
@@ -882,10 +896,12 @@ impl PageCache {
///
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
&'static self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
// Get in line.
let receiver = self.find_victim_waiters.recv();
let mut iters = 0;
loop {
iters += 1;
@@ -897,41 +913,8 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
if iters > iter_limit {
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
// any particular number of iterations: other threads might race ahead and acquire and
// release pins just as we're scanning the array.
//
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
// slots. There are two threads running concurrently, A and B. A has just
// acquired the permit from the semaphore.
//
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
// B: Acquire permit.
// B: Look at slot 2, decrement its usage_count to zero and continue the search
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
// B: Acquire permit.
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
//
// Now we're back in the starting situation that both slots have
// usage_count 1, but A has now been through one iteration of the
// find_victim() loop. This can repeat indefinitely and on each
// iteration, A's iteration count increases by one.
//
// So, even though the semaphore for the permits is fair, the victim search
// itself happens in parallel and is not fair.
// Hence even with a permit, a task can theoretically be starved.
// To avoid this, we'd need tokio to give priority to tasks that are holding
// permits for longer.
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
);
anyhow::bail!("exceeded evict iter limit");
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
unreachable!("find_victim_waiters prevents starvation");
}
continue;
}
@@ -942,7 +925,16 @@ impl PageCache {
inner.key = None;
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
self.find_victim_sender
.try_send((slot_idx, inner))
.expect("we always get in line first");
match futures::poll!(receiver) {
Poll::Ready(Ok(res)) => return Ok(res),
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
Poll::Pending => {
unreachable!("we just sent to the channel and got in line earlier")
}
}
}
}
}
@@ -979,6 +971,7 @@ impl PageCache {
})
.collect();
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
@@ -986,6 +979,8 @@ impl PageCache {
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
find_victim_sender,
find_victim_waiters,
}
}
}

View File

@@ -20,10 +20,10 @@ use std::io::{Error, ErrorKind};
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(
pub async fn read_blob<'c>(
&self,
offset: u64,
ctx: &RequestContext,
ctx: &'c RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
@@ -31,11 +31,11 @@ impl<'a> BlockCursor<'a> {
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_blob_into_buf(
pub async fn read_blob_into_buf<'c>(
&self,
offset: u64,
dstbuf: &mut Vec<u8>,
ctx: &RequestContext,
ctx: &'c RequestContext,
) -> Result<(), std::io::Error> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;

View File

@@ -34,27 +34,27 @@ where
}
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
pub enum BlockLease<'c, 'a> {
PageReadGuard(PageReadGuard<'c, 'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
impl<'c, 'a> From<PageReadGuard<'c, 'a>> for BlockLease<'c, 'a> {
fn from(value: PageReadGuard<'c, 'a>) -> BlockLease<'c, 'a> {
BlockLease::PageReadGuard(value)
}
}
#[cfg(test)]
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
impl<'c, 'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'c, 'a> {
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Arc(value)
}
}
impl<'a> Deref for BlockLease<'a> {
impl<'c, 'a> Deref for BlockLease<'c, 'a> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
@@ -83,11 +83,11 @@ pub(crate) enum BlockReaderRef<'a> {
impl<'a> BlockReaderRef<'a> {
#[inline(always)]
async fn read_blk(
async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
@@ -141,11 +141,11 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub async fn read_blk(
pub async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, std::io::Error> {
self.reader.read_blk(blknum, ctx).await
}
}
@@ -180,32 +180,27 @@ impl FileBlockReader {
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub async fn read_blk(
pub async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, 'static>, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
Ok(write_guard.mark_valid().into())
}
}
}
}

View File

@@ -64,44 +64,40 @@ impl EphemeralFile {
self.len
}
pub(crate) async fn read_blk(
pub(crate) async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -171,7 +167,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {

View File

@@ -549,7 +549,7 @@ impl DeltaLayer {
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
pub(crate) async fn load_keys<'c>(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.await
@@ -1038,9 +1038,9 @@ pub struct ValueRef<'a> {
reader: BlockCursor<'a>,
}
impl<'a> ValueRef<'a> {
impl<'c, 'a> ValueRef<'a> {
/// Loads the value from disk
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
pub async fn load(&self, ctx: &'c RequestContext) -> Result<Value> {
// theoretically we *could* record an access time for each, but it does not really matter
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
@@ -1051,11 +1051,11 @@ impl<'a> ValueRef<'a> {
pub(crate) struct Adapter<T>(T);
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) async fn read_blk(
pub(crate) async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, std::io::Error> {
self.0.as_ref().file.read_blk(blknum, ctx).await
}
}

View File

@@ -158,7 +158,7 @@ pub struct Timeline {
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
/// Never changes for the lifetime of this [`Timeline`] object.
///
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
@@ -505,7 +505,7 @@ impl Timeline {
timer.stop_and_record();
let start = Instant::now();
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let res = self.reconstruct_value(key, lsn, reconstruct_state, ctx).await;
let elapsed = start.elapsed();
crate::metrics::RECONSTRUCT_TIME
.for_result(&res)
@@ -4279,6 +4279,7 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
@@ -4342,6 +4343,7 @@ impl Timeline {
key,
last_rec_lsn,
&img,
ctx,
)
.await
.context("Materialized page memoization failed")

View File

@@ -16,7 +16,7 @@
use std::{
collections::HashMap,
ops::ControlFlow,
sync::Arc,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
@@ -25,7 +25,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
context::{DownloadBehavior, RequestContext, RequestContextBuilder},
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
@@ -397,9 +397,14 @@ impl Timeline {
}
}
let permit = crate::page_cache::get().get_permit().await;
let ctx = RequestContextBuilder::extend(ctx)
.page_cache_permit(permit)
.build();
// imitiate repartiting on first compactation
if let Err(e) = self
.collect_keyspace(lsn, ctx)
.collect_keyspace(lsn, &ctx)
.instrument(info_span!("collect_keyspace"))
.await
{

View File

@@ -544,7 +544,7 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
) -> Result<crate::tenant::block_io::BlockLease<'_, '_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))