revert recent VirtualFile asyncification changes (#5291)

Motivation
==========

We observed two "indigestion" events on staging, each shortly after
restarting `pageserver-0.eu-west-1.aws.neon.build`. It has ~8k tenants.

The indigestion manifests as `Timeline::get` calls failing with
`exceeded evict iter limit` .
The error is from `page_cache.rs`; it was unable to find a free page and
hence failed with the error.

The indigestion events started occuring after we started deploying
builds that contained the following commits:

```
[~/src/neon]: git log --oneline c0ed362790caa368aa65ba57d352a2f1562fd6bf..15eaf78083ecff62b7669
091da1a1c8b4f60ebf8
15eaf7808 Disallow block_in_place and Handle::block_on (#5101)
a18d6d9ae Make File opening in VirtualFile async-compatible (#5280)
76cc87398 Use tokio locks in VirtualFile and turn with_file into macro (#5247)
```

The second and third commit are interesting.
They add .await points to the VirtualFile code.

Background
==========

On the read path, which is the dominant user of page cache & VirtualFile
during pageserver restart, `Timeline::get` `page_cache` and VirtualFile
interact as follows:

1. Timeline::get tries to read from a layer
2. This read goes through the page cache.
3. If we have a page miss (which is known to be common after restart),
page_cache uses `find_victim` to find an empty slot, and once it has
found a slot, it gives exclusive ownership of it to the caller through a
`PageWriteGuard`.
4. The caller is supposed to fill the write guard with data from the
underlying backing store, i.e., the layer `VirtualFile`.
5. So, we call into `VirtualFile::read_at`` to fill the write guard.

The `find_victim` method finds an empty slot using a basic
implementation of clock page replacement algorithm.
Slots that are currently in use (`PageReadGuard` / `PageWriteGuard`)
cannot become victims.
If there have been too many iterations, `find_victim` gives up with
error `exceeded evict iter limit`.

Root Cause For Indigestion
==========================

The second and third commit quoted in the "Motivation" section
introduced `.await` points in the VirtualFile code.
These enable tokio to preempt us and schedule another future __while__
we hold the `PageWriteGuard` and are calling `VirtualFile::read_at`.
This was not possible before these commits, because there simply were no
await points that weren't Poll::Ready immediately.
With the offending commits, there is now actual usage of
`tokio::sync::RwLock` to protect the VirtualFile file descriptor cache.
And we __know__ from other experiments that, during the post-restart
"rush", the VirtualFile fd cache __is__ too small, i.e., all slots are
taken by _ongoing_ VirtualFile operations and cannot be victims.
So, assume that VirtualFile's `find_victim_slot`'s
`RwLock::write().await` calls _will_ yield control to the executor.

The above can lead to the pathological situation if we have N runnable
tokio tasks, each wanting to do `Timeline::get`, but only M slots, N >>
M.
Suppose M of the N tasks win a PageWriteGuard and get preempted at some
.await point inside `VirtualFile::read_at`.
Now suppose tokio schedules the remaining N-M tasks for fairness, then
schedules the first M tasks again.
Each of the N-M tasks will run `find_victim()` until it hits the
`exceeded evict iter limit`.
Why? Because the first M tasks took all the slots and are still holding
them tight through their `PageWriteGuard`.

The result is massive wastage of CPU time in `find_victim()`.
The effort to find a page is futile, but each of the N-M tasks still
attempts it.

This delays the time when tokio gets around to schedule the first M
tasks again.
Eventually, tokio will schedule them, they will make progress, fill the
`PageWriteGuard`, release it.
But in the meantime, the N-M tasks have already bailed with error
`exceeded evict iter limit`.

Eventually, higher level mechanisms will retry for the N-M tasks, and
this time, there won't be as many concurrent tasks wanting to do
`Timeline::get`.
So, it will shake out.

But, it's a massive indigestion until then.

This PR
=======

This PR reverts the offending commits until we find a proper fix.

```
    Revert "Use tokio locks in VirtualFile and turn with_file into macro (#5247)"
    
    This reverts commit 76cc87398c.


    Revert "Make File opening in VirtualFile async-compatible (#5280)"
    
    This reverts commit a18d6d9ae3.
```
This commit is contained in:
Christian Schwarz
2023-09-12 17:38:31 +02:00
committed by GitHub
parent 83e7e5dbbd
commit ab1f37e908

View File

@@ -18,8 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use std::sync::{RwLock, RwLockWriteGuard};
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -111,7 +110,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -143,7 +142,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().await;
slot_guard = slot.inner.write().unwrap();
index = next;
break;
}
@@ -154,7 +153,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
@@ -209,29 +208,6 @@ impl CrashsafeOverwriteError {
}
}
/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
@@ -268,9 +244,11 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -353,24 +331,22 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
let open_files = get_open_files();
let mut handle_guard = {
@@ -380,23 +356,27 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().await;
let mut handle = *self.handle.read().unwrap();
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().await;
let handle_guard = self.handle.write().unwrap();
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -410,10 +390,17 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
let (handle, mut slot_guard) = open_files.find_victim_slot();
// Open the physical file
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -421,9 +408,7 @@ impl VirtualFile {
*handle_guard = handle;
return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
Ok(result)
}
pub fn remove(self) {
@@ -438,9 +423,11 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -528,9 +515,9 @@ impl VirtualFile {
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
@@ -540,9 +527,9 @@ impl VirtualFile {
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
@@ -552,18 +539,6 @@ impl VirtualFile {
}
}
struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
@@ -596,39 +571,20 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();
let handle = self.handle.get_mut().unwrap();
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}