Compare commits

...

8 Commits

Author SHA1 Message Date
Christian Schwarz
3da5015b62 less permits than slots (50%) 2023-10-05 08:00:45 +00:00
Christian Schwarz
2f27412d7d starvation prevention v2 while allowing concurrent find_victims 2023-10-05 08:00:45 +00:00
Christian Schwarz
b147bde907 idea: use downgrade() to ensure forward progress 2023-10-05 08:00:45 +00:00
Christian Schwarz
aba5d1b31f Revert "Revert "page_cache: find_victim: don't spin while there's no chance for a slot (#5319)""
This reverts commit b3d889475839a5f651209dd7c0de203967d905ef.
2023-10-05 08:00:45 +00:00
Christian Schwarz
aa8363f033 Revert "page_cache: find_victim: don't spin while there's no chance for a slot (#5319)"
To repro the original condition.

This reverts commit c07eef8ea5.
2023-10-05 08:00:24 +00:00
Christian Schwarz
d6b1f028c9 Revert "revert recent VirtualFile asyncification changes (#5291)"
This reverts commit ab1f37e908.
2023-10-04 17:55:12 +00:00
Christian Schwarz
49e63aa63f REPRO: rebase fallout & add some instructions 2023-10-04 17:55:12 +00:00
Christian Schwarz
97f61721cc REPRO the problem: , uses 430GB of space; 4 seconds load time; constant 20kIOPS after ~20s 2023-10-04 17:55:12 +00:00
9 changed files with 330 additions and 219 deletions

27
Cargo.lock generated
View File

@@ -158,6 +158,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.4.0"
@@ -1015,6 +1026,15 @@ dependencies = [
"zstd",
]
[[package]]
name = "concurrent-queue"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "const_format"
version = "0.2.30"
@@ -1435,6 +1455,12 @@ dependencies = [
"libc",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fail"
version = "0.5.1"
@@ -2656,6 +2682,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-compression",
"async-stream",
"async-trait",

View File

@@ -81,6 +81,7 @@ enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tempfile.workspace = true
async-channel = "1.9.0"
[dev-dependencies]
criterion.workspace = true

View File

@@ -79,6 +79,7 @@ use std::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
},
task::Poll,
time::Duration,
};
@@ -252,6 +253,11 @@ pub struct PageCache {
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
find_victim_sender:
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
find_victim_waiters:
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
size_metrics: &'static PageCacheSizeMetrics,
}
@@ -291,18 +297,23 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
state: PageWriteGuardState<'i>,
}
_permit: PinnedSlotsPermit,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
enum PageWriteGuardState<'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
},
Downgraded,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -310,25 +321,37 @@ impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => todo!(),
}
}
}
impl PageWriteGuard<'_> {
impl<'a> PageWriteGuard<'a> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
#[must_use]
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit: Arc::new(_permit),
slot_guard: inner.downgrade(),
}
}
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -339,11 +362,13 @@ impl Drop for PageWriteGuard<'_> {
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
inner.key = None;
}
PageWriteGuardState::Downgraded => {}
}
}
}
@@ -356,7 +381,7 @@ pub enum ReadBufResult<'a> {
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
@@ -430,7 +455,7 @@ impl PageCache {
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
&'static self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
@@ -447,15 +472,15 @@ impl PageCache {
};
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
WriteBufResult::Found(read_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
assert!(*read_guard == img);
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
}
}
@@ -465,7 +490,7 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
&'static self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
@@ -571,7 +596,7 @@ impl PageCache {
/// ```
///
async fn lock_for_read(
&self,
&'static self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
@@ -638,41 +663,31 @@ impl PageCache {
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
// FIXME: the name is wrong.
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
) -> Option<PageReadGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Some(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: true,
return Some(PageReadGuard {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
}
}
@@ -683,7 +698,7 @@ impl PageCache {
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
async fn lock_for_write(&'static self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
@@ -728,9 +743,10 @@ impl PageCache {
);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
}));
}
}
@@ -882,10 +898,12 @@ impl PageCache {
///
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
&'static self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
// Get in line.
let receiver = self.find_victim_waiters.recv();
let mut iters = 0;
loop {
iters += 1;
@@ -897,41 +915,8 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
if iters > iter_limit {
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
// any particular number of iterations: other threads might race ahead and acquire and
// release pins just as we're scanning the array.
//
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
// slots. There are two threads running concurrently, A and B. A has just
// acquired the permit from the semaphore.
//
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
// B: Acquire permit.
// B: Look at slot 2, decrement its usage_count to zero and continue the search
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
// B: Acquire permit.
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
//
// Now we're back in the starting situation that both slots have
// usage_count 1, but A has now been through one iteration of the
// find_victim() loop. This can repeat indefinitely and on each
// iteration, A's iteration count increases by one.
//
// So, even though the semaphore for the permits is fair, the victim search
// itself happens in parallel and is not fair.
// Hence even with a permit, a task can theoretically be starved.
// To avoid this, we'd need tokio to give priority to tasks that are holding
// permits for longer.
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
);
anyhow::bail!("exceeded evict iter limit");
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
unreachable!("find_victim_waiters prevents starvation");
}
continue;
}
@@ -942,7 +927,14 @@ impl PageCache {
inner.key = None;
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
self.find_victim_sender
.try_send((slot_idx, inner))
.expect("we always get in line first");
match futures::poll!(receiver) {
Poll::Ready(Ok(res)) => return Ok(res),
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
Poll::Pending => unreachable!("we just sent to the channel and got in line earlier"),
}
}
}
}
@@ -979,13 +971,17 @@ impl PageCache {
})
.collect();
let num_permits = num_pages/2;
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_permits);
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_permits)),
find_victim_sender,
find_victim_waiters,
}
}
}

View File

@@ -186,27 +186,22 @@ impl FileBlockReader {
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => return Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
return Ok(write_guard.mark_valid().into());
}
};
}
}

View File

@@ -70,38 +70,34 @@ impl EphemeralFile {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -171,7 +167,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {

View File

@@ -864,11 +864,11 @@ impl DeltaLayerInner {
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -457,11 +457,11 @@ impl ImageLayerInner {
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -18,7 +18,8 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -110,7 +111,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -142,7 +143,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
slot_guard = slot.inner.write().await;
index = next;
break;
}
@@ -153,7 +154,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
@@ -208,6 +209,29 @@ impl CrashsafeOverwriteError {
}
}
/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
@@ -244,11 +268,9 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -331,22 +353,24 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
let open_files = get_open_files();
let mut handle_guard = {
@@ -356,27 +380,23 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
let mut handle = *self.handle.read().await;
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
let handle_guard = self.handle.write().await;
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -390,17 +410,10 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
// Open the physical file
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -408,7 +421,9 @@ impl VirtualFile {
*handle_guard = handle;
Ok(result)
return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
}
pub fn remove(self) {
@@ -423,11 +438,9 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -515,9 +528,9 @@ impl VirtualFile {
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
@@ -527,9 +540,9 @@ impl VirtualFile {
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
@@ -539,6 +552,18 @@ impl VirtualFile {
}
}
struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
@@ -571,20 +596,39 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
let handle = self.handle.get_mut();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
}
}

View File

@@ -0,0 +1,52 @@
import queue
import threading
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.types import TenantId
"""
553 sudo mkfs.ext4 /dev/nvme1n1
555 mkdir test_output
556 sudo mount /dev/nvme1n1 test_output
557 htop
559 ./scripts/pysync
560 NEON_BIN=/home/admin/neon/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
561 sudo chown -R admin:admin test_output
cargo build_testing --release
562 NEON_BIN=$PWD/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
cd test_output/test_pageserver_startup_many_tenants/repo
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local start
# watch initial load complete, then background jobs start. That's the interesting part.
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local stop
# usually pageserver won't be responsive, kill with
sudo pkill -9 pageserver
"""
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# below doesn't work because summaries contain tenant and timeline ids and we check for them
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pshttp = env.pageserver.http_client()
ep = env.endpoints.create_start("main")
ep.safe_psql("create table foo(b text)")
for i in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
pshttp.timeline_checkpoint(tenant_id, timeline_id)
ep.stop_and_destroy()
env.pageserver.stop()
for sk in env.safekeepers:
sk.stop()
tenant_dir = env.repo_dir / "pageserver_1" / "tenants" / str(env.initial_tenant)
for i in range(0, 20_000):
import shutil
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))