QUICK HACK: rip out virtual file cache

This commit is contained in:
Christian Schwarz
2023-08-29 11:33:28 +02:00
committed by Christian Schwarz
parent 2d5d046062
commit 4d69192ae5
6 changed files with 7 additions and 539 deletions

View File

@@ -136,7 +136,6 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -2,10 +2,10 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use clap::Subcommand;
use pageserver::page_cache;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
tenant::{
@@ -45,7 +45,6 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0).await?;

View File

@@ -15,7 +15,6 @@ use pageserver::{
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use std::path::{Path, PathBuf};
@@ -116,7 +115,6 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -24,7 +24,6 @@ use pageserver::{
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
@@ -125,7 +124,6 @@ fn main() -> anyhow::Result<()> {
let scenario = pageserver::failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -87,8 +87,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at_async(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();
// Swap for read lock

View File

@@ -11,14 +11,12 @@
//! src/backend/storage/file/fd.c
//!
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::ops::Deref;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -40,7 +38,7 @@ pub struct VirtualFile {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
/// belongs to a different VirtualFile.
handle: RwLock<SlotHandle>,
handle: File,
/// Current file position
pos: u64,
@@ -52,7 +50,6 @@ pub struct VirtualFile {
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
pub path: PathBuf,
open_options: OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
@@ -61,118 +58,6 @@ pub struct VirtualFile {
timeline_id: String,
}
#[derive(Debug, PartialEq, Clone, Copy)]
struct SlotHandle {
/// Index into OPEN_FILES.slots
index: usize,
/// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
/// been recycled and no longer contains the FD for this virtual file.
tag: u64,
}
/// OPEN_FILES is the global array that holds the physical file descriptors that
/// are currently open. Each slot in the array is protected by a separate lock,
/// so that different files can be accessed independently. The lock must be held
/// in write mode to replace the slot with a different file, but a read mode
/// is enough to operate on the file, whether you're reading or writing to it.
///
/// OPEN_FILES starts in uninitialized state, and it's initialized by
/// the virtual_file::init() function. It must be called exactly once at page
/// server startup.
static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
struct OpenFiles {
slots: &'static [Slot],
/// clock arm for the clock algorithm
next: AtomicUsize,
}
struct Slot {
inner: tokio::sync::RwLock<SlotInner>,
/// has this file been used since last clock sweep?
recently_used: AtomicBool,
}
struct SlotInner {
/// Counter that's incremented every time a different file is stored here.
/// To avoid the ABA problem.
tag: u64,
/// the underlying file
file: Option<File>,
}
impl OpenFiles {
/// Find a slot to use, evicting an existing file descriptor if needed.
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
let num_slots = self.slots.len();
let mut retries = 0;
let mut slot;
let mut slot_guard;
let index;
loop {
let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
slot = &self.slots[next];
// If the recently_used flag on this slot is set, continue the clock
// sweep. Otherwise try to use this slot. If we cannot acquire the
// lock, also continue the clock sweep.
//
// We only continue in this manner for a while, though. If we loop
// through the array twice without finding a victim, just pick the
// next slot and wait until we can reuse it. This way, we avoid
// spinning in the extreme case that all the slots are busy with an
// I/O operation.
if retries < num_slots * 2 {
if !slot.recently_used.swap(false, Ordering::Release) {
if let Ok(guard) = slot.inner.try_write() {
slot_guard = guard;
index = next;
break;
}
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
index = next;
break;
}
}
//
// We now have the victim slot locked. If it was in use previously, close the
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// distinguish the two.
STORAGE_IO_TIME
.with_label_values(&["close-by-replace"])
.observe_closure_duration(|| drop(old_file));
}
// Prepare the slot for reuse and return it
slot_guard.tag += 1;
slot.recently_used.store(true, Ordering::Relaxed);
(
SlotHandle {
index,
tag: slot_guard.tag,
},
slot_guard,
)
}
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
@@ -208,7 +93,6 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| open_options.open(path))?;
@@ -224,16 +108,13 @@ impl VirtualFile {
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: RwLock::new(handle),
handle: file,
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
timeline_id,
};
slot_guard.file.replace(file);
Ok(vfile)
}
@@ -247,23 +128,6 @@ impl VirtualFile {
}
}
/// The function that constructs the guard guarantees that the respective [`SlotInner::file`] is `Some`.
enum FileSlotGuard {
Write(tokio::sync::RwLockWriteGuard<'static, SlotInner>),
Read(tokio::sync::RwLockReadGuard<'static, SlotInner>),
}
impl Deref for FileSlotGuard {
type Target = File;
fn deref(&self) -> &Self::Target {
match self {
FileSlotGuard::Write(guard) => guard.file.as_ref().unwrap(),
FileSlotGuard::Read(guard) => guard.file.as_ref().unwrap(),
}
}
}
impl VirtualFile {
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
@@ -272,85 +136,9 @@ impl VirtualFile {
where
F: FnMut(&File) -> R,
{
let guard = self.get_file_guard()?;
return Ok(STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(&guard)));
}
/// Like with_file, but, takes a func that returns a future.
async fn with_file_async<F, Fut, R>(&self, op: &str, mut factory: F) -> Result<R, Error>
where
F: FnOnce(FileSlotGuard) -> Fut,
Fut: std::future::Future<Output = R>,
{
let guard = self.get_file_guard()?;
let start = std::time::Instant::now();
let res = factory(guard).await;
STORAGE_IO_TIME
.with_label_values(&[op])
.observe(start.elapsed().as_secs_f64());
Ok(res)
}
fn get_file_guard(&self) -> Result<FileSlotGuard, Error> {
let open_files = get_open_files();
let mut handle_guard = {
// Read the cached slot handle, and see if the slot that it points to still
// contains our File.
//
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(_) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileSlotGuard::Read(slot_guard));
}
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
if *handle_guard != handle {
handle = *handle_guard;
continue;
}
break handle_guard;
}
};
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
// Open the physical file
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
slot_guard.file.replace(file);
*handle_guard = handle;
Ok(FileSlotGuard::Write(slot_guard))
.observe_closure_duration(|| func(&self.handle)));
}
pub fn remove(self) {
@@ -360,26 +148,6 @@ impl VirtualFile {
}
}
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
@@ -442,31 +210,6 @@ impl VirtualFile {
Ok(())
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at_async(
&self,
mut buf: &mut [u8],
mut offset: u64,
) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at_async(buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
@@ -498,21 +241,6 @@ impl VirtualFile {
result
}
async fn read_at_async(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file_async("read", |file| async move {
// todo: use async IO here
file.read_at(buf, offset)
})
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
}
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result {
@@ -523,256 +251,3 @@ impl VirtualFile {
result
}
}
impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));
for _ in 0..num_slots {
let slot = Slot {
recently_used: AtomicBool::new(false),
inner: RwLock::new(SlotInner { tag: 0, file: None }),
};
slots.push(slot);
}
OpenFiles {
next: AtomicUsize::new(0),
slots: Box::leak(slots),
}
}
}
///
/// Initialize the virtual file module. This must be called once at page
/// server startup.
///
pub fn init(num_slots: usize) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
}
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
// Get a handle to the global slots array.
fn get_open_files() -> &'static OpenFiles {
//
// In unit tests, page server startup doesn't happen and no one calls
// virtual_file::init(). Initialize it here, with a small array.
//
// This applies to the virtual file tests below, but all other unit
// tests too, so the virtual file facility is always usable in
// unit tests.
//
if cfg!(test) {
OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
} else {
OPEN_FILES.get().expect("virtual_file::init not called yet")
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
use std::sync::Arc;
use std::thread;
// Helper function to slurp contents of a file, starting at the current position,
// into a string
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error>
where
FD: Read,
{
let mut buf = String::new();
vfile.read_to_string(&mut buf)?;
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error>
where
FD: FileExt,
{
let mut buf = Vec::new();
buf.resize(len, 0);
vfile.read_exact_at(&mut buf, pos)?;
Ok(String::from_utf8(buf).unwrap())
}
#[test]
fn test_virtual_files() -> Result<(), Error> {
// The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them,
// but this allows us to verify that the operations return the same
// results with VirtualFiles as with native Files. (Except that with
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| {
VirtualFile::open_with_options(path, open_options)
})
}
#[test]
fn test_physical_files() -> Result<(), Error> {
test_files("physical_files", |path, open_options| {
open_options.open(path)
})
}
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error>
where
FD: Read + Write + Seek + FileExt,
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?;
let path_a = testdir.join("file_a");
let mut file_a = openfunc(
&path_a,
OpenOptions::new().write(true).create(true).truncate(true),
)?;
file_a.write_all(b"foobar")?;
// cannot read from a file opened in write-only mode
assert!(read_string(&mut file_a).is_err());
// Close the file and re-open for reading
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
// cannot write to a file opened in read-only mode
assert!(file_a.write(b"bar").is_err());
// Try simple read
assert_eq!("foobar", read_string(&mut file_a)?);
// It's positioned at the EOF now.
assert_eq!("", read_string(&mut file_a)?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?);
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
assert_eq!("ar", read_string(&mut file_a)?);
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
assert_eq!("bar", read_string(&mut file_a)?);
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?);
// Test erroneous seeks to before byte 0
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", read_string(&mut file_a)?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
let mut file_b = openfunc(
&path_b,
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true),
)?;
file_b.write_all_at(b"BAR", 3)?;
file_b.write_all_at(b"FOO", 0)?;
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA");
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
//
// leave file_a positioned at offset 1 before we start
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
assert_eq!("FOOBAR", read_string(&mut vfile)?);
vfiles.push(vfile);
}
// make sure we opened enough files to definitely cause evictions.
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", read_string(&mut file_a)?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?);
}
Ok(())
}
/// Test using VirtualFiles from many threads concurrently. This tests both using
/// a lot of VirtualFiles concurrently, causing evictions, and also using the same
/// VirtualFile from multiple threads concurrently.
#[test]
fn test_vfile_concurrency() -> Result<(), Error> {
const SIZE: usize = 8 * 1024;
const VIRTUAL_FILES: usize = 100;
const THREADS: usize = 100;
const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
std::fs::create_dir_all(&testdir)?;
// Create a test file.
let test_file_path = testdir.join("concurrency_test_file");
{
let file = File::create(&test_file_path)?;
file.write_all_at(&SAMPLE, 0)?;
}
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
files.push(f);
}
let files = Arc::new(files);
// Launch many threads, and use the virtual files concurrently in random order.
let mut threads = Vec::new();
for threadno in 0..THREADS {
let builder =
thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno));
let files = files.clone();
let thread = builder
.spawn(move || {
let mut buf = [0u8; SIZE];
let mut rng = rand::thread_rng();
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
f.read_exact_at(&mut buf, 0).unwrap();
assert!(buf == SAMPLE);
}
})
.unwrap();
threads.push(thread);
}
for thread in threads {
thread.join().unwrap();
}
Ok(())
}
}