From aca2d7bdeaadd6cc95a074d8426302cb46d84cbe Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 29 Jan 2024 16:20:24 +0000 Subject: [PATCH] use O_DIRECT for VirtualFile reads --- pageserver/src/buffer_pool.rs | 19 ++++++-- pageserver/src/page_cache.rs | 47 ++++++++++--------- .../src/tenant/storage_layer/delta_layer.rs | 9 +++- .../src/tenant/storage_layer/image_layer.rs | 9 +++- pageserver/src/virtual_file/open_options.rs | 19 +++++++- 5 files changed, 73 insertions(+), 30 deletions(-) diff --git a/pageserver/src/buffer_pool.rs b/pageserver/src/buffer_pool.rs index 1078ee50e9..176693439f 100644 --- a/pageserver/src/buffer_pool.rs +++ b/pageserver/src/buffer_pool.rs @@ -2,18 +2,27 @@ use std::cell::RefCell; use crate::tenant::disk_btree::PAGE_SZ; -pub struct Buffer(Option>); +#[repr(C, align(8192))] +struct BufferContent([u8; PAGE_SZ]); + +impl BufferContent { + fn empty() -> Self { + BufferContent(std::array::from_fn(|_| 0)) + } +} + +pub struct Buffer(Option>); // Thread-local list of re-usable buffers. thread_local! { - static POOL: RefCell>> = RefCell::new(Vec::new()); + static POOL: RefCell>> = RefCell::new(Vec::new()); } pub(crate) fn get() -> Buffer { let maybe = POOL.with(|rc| rc.borrow_mut().pop()); match maybe { Some(buf) => Buffer(Some(buf)), - None => Buffer(Some(Box::new([0; PAGE_SZ]))), + None => Buffer(Some(Box::new(BufferContent::empty()))), } } @@ -28,13 +37,13 @@ impl std::ops::Deref for Buffer { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.0.as_ref().unwrap().as_ref() + &self.0.as_ref().unwrap().as_ref().0 } } impl std::ops::DerefMut for Buffer { fn deref_mut(&mut self) -> &mut Self::Target { - self.0.as_mut().unwrap().as_mut() + &mut self.0.as_mut().unwrap().as_mut().0 } } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index f6ad4c4021..064df52f9e 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -165,7 +165,17 @@ struct SlotInner { key: Option, // for `coalesce_readers_permit` permit: std::sync::Mutex>, - buf: &'static mut [u8; PAGE_SZ], + buf: &'static mut SlotContents, +} + +#[derive(Clone)] +#[repr(C, align(8192))] +struct SlotContents([u8; PAGE_SZ]); + +impl SlotContents { + fn empty() -> Self { + Self(std::array::from_fn(|_| 0)) + } } impl Slot { @@ -253,13 +263,13 @@ impl std::ops::Deref for PageReadGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.slot_guard.buf + &self.slot_guard.buf.0 } } impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { fn as_ref(&self) -> &[u8; PAGE_SZ] { - self.slot_guard.buf + &self.slot_guard.buf.0 } } @@ -285,7 +295,7 @@ enum PageWriteGuardState<'i> { impl std::ops::DerefMut for PageWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { match &mut self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf.0, PageWriteGuardState::Downgraded => unreachable!(), } } @@ -296,7 +306,7 @@ impl std::ops::Deref for PageWriteGuard<'_> { fn deref(&self) -> &Self::Target { match &self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Invalid { inner, _permit } => &inner.buf.0, PageWriteGuardState::Downgraded => unreachable!(), } } @@ -672,29 +682,22 @@ impl PageCache { fn new(num_pages: usize) -> Self { assert!(num_pages > 0, "page cache size must be > 0"); - // We could use Vec::leak here, but that potentially also leaks - // uninitialized reserved capacity. With into_boxed_slice and Box::leak - // this is avoided. - let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice()); + let slot_contents = Box::leak(vec![SlotContents::empty(); num_pages].into_boxed_slice()); let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; size_metrics.max_bytes.set_page_sz(num_pages); size_metrics.current_bytes_immutable.set_page_sz(0); size_metrics.current_bytes_materialized_page.set_page_sz(0); - let slots = page_buffer - .chunks_exact_mut(PAGE_SZ) - .map(|chunk| { - let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); - - Slot { - inner: tokio::sync::RwLock::new(SlotInner { - key: None, - buf, - permit: std::sync::Mutex::new(Weak::new()), - }), - usage_count: AtomicU8::new(0), - } + let slots = slot_contents + .into_iter() + .map(|slot_contents| Slot { + inner: tokio::sync::RwLock::new(SlotInner { + key: None, + buf: slot_contents, + permit: std::sync::Mutex::new(Weak::new()), + }), + usage_count: AtomicU8::new(0), }) .collect(); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 3a445ef71e..3e7a8e10e5 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -688,7 +688,14 @@ impl DeltaLayerInner { summary: Option, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let file = match VirtualFile::open(path).await { + let file = match VirtualFile::open_with_options( + path, + virtual_file::OpenOptions::new() + .read(true) + .custom_flags(nix::libc::O_DIRECT), + ) + .await + { Ok(file) => file, Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), }; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index c62e6aed51..f266ebb60b 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -367,7 +367,14 @@ impl ImageLayerInner { summary: Option, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let file = match VirtualFile::open(path).await { + let file = match VirtualFile::open_with_options( + path, + virtual_file::OpenOptions::new() + .read(true) + .custom_flags(nix::libc::O_DIRECT), + ) + .await + { Ok(file) => file, Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), }; diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index 1e5ffe15cc..5445bd6ca2 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -1,7 +1,12 @@ //! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`]; +use nix::libc; + use super::IoEngineKind; -use std::{os::fd::OwnedFd, path::Path}; +use std::{ + os::{fd::OwnedFd, unix::fs::OpenOptionsExt}, + path::Path, +}; #[derive(Debug, Clone)] pub enum OpenOptions { @@ -92,6 +97,18 @@ impl OpenOptions { self } + pub fn custom_flags(&mut self, custom_flags: libc::c_int) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.custom_flags(custom_flags); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.custom_flags(custom_flags); + } + } + self + } + pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result { match self { OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),