mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 17:10:38 +00:00
# Problem The Pageserver read path exclusively uses direct IO if `virtual_file_io_mode=direct`. The write path is half-finished. Here is what the various writing components use: |what|buffering|flags on <br/>`v_f_io_mode`<br/>=`buffered`|flags on <br/>`virtual_file_io_mode`<br/>=`direct`| |-|-|-|-| |`DeltaLayerWriter`| BlobWriter<BUFFERED=true> | () | () | |`ImageLayerWriter`| BlobWriter<BUFFERED=false> | () | () | |`download_layer_file`|BufferedWriter|()|()| |`InMemoryLayer`|BufferedWriter|()|O_DIRECT| The vehicle towards direct IO support is `BufferedWriter` which - largely takes care of O_DIRECT alignment & size-multiple requirements - double-buffering to mask latency `DeltaLayerWriter`, `ImageLayerWriter` use `blob_io::BlobWriter` , which has neither of these. # Changes ## High-Level At a high-level this PR makes the following primary changes: - switch the two layer writer types to use `BufferedWriter` & make sensitive to `virtual_file_io_mode` (via open_with_options_**v2**) - make `download_layer_file` sensitive to `virtual_file_io_mode` (also via open_with_options_**v2**) - add `virtual_file_io_mode=direct-rw` as a feature gate - we're hackish-ly piggybacking on OpenOptions's ask for write access here - this means with just `=direct` InMemoryLayer reads and writes no longer uses O_DIRECT - this is transitory and we'll remove the `direct-rw` variant once the rollout is complete (The `_v2` APIs for opening / creating VirtualFile are those that are sensitive to `virtual_file_io_mode`) The result is: |what|uses <br/>`BufferedWriter`|flags on <br/>`v_f_io_mode`<br/>=`buffered`|flags on <br/>`v_f_io_mode`<br/>=`direct`|flags on <br/>`v_f_io_mode`<br/>=`direct-rw`| |-|-|-|-|-| |`DeltaLayerWriter`| ~~Blob~~BufferedWriter | () | () | O_DIRECT | |`ImageLayerWriter`| ~~Blob~~BufferedWriter | () | () | O_DIRECT | |`download_layer_file`|BufferedWriter|()|()|O_DIRECT| |`InMemoryLayer`|BufferedWriter|()|~~O_DIRECT~~()|O_DIRECT| ## Code-Level The main change is: - Switch `blob_io::BlobWriter` away from its own buffering method to use `BufferedWriter`. Additional prep for upholding `O_DIRECT` requirements: - Layer writer `finish()` methods switched to use IoBufferMut for guaranteed buffer address alignment. The size of the buffers is PAGE_SZ and thereby implicitly assumed to fulfill O_DIRECT requirements. For the hacky feature-gating via `=direct-rw`: - Track `OpenOptions::write(true|false)` in a field; bunch of mechanical churn. - Consolidate the APIs in which we "open" or "create" VirtualFile for better overview over which parts of the code use the `_v2` APIs. Necessary refactorings & infra work: - Add doc comments explaining how BufferedWriter ensures that writes are compliant with O_DIRECT alignment & size constraints. This isn't new, but should be spelled out. - Add the concept of shutdown modes to `BufferedWriter::shutdown` to make writer shutdown adhere to these constraints. - The `PadThenTruncate` mode might not be necessary in practice because I believe all layer files ever written are sized in multiples `PAGE_SZ` and since `PAGE_SZ` is larger than the current alignment requirements (512/4k depending on platform), it won't be necesary to pad. - Some test (I believe `round_trip_test_compressed`?) required it though - [ ] TODO: decide if we want to accept that complexity; if we do then address TODO in the code to separate alignment requirement from buffer capacity - Add `set_len` (=`ftruncate`) VirtualFile operation to support the above. - Allow `BufferedWriter` to start at a non-zero offset (to make room for the summary block). Cleanups unlocked by this change: - Remove non-positional APIs from VirtualFile (e.g. seek, write_full, read_full) Drive-by fixes: - PR https://github.com/neondatabase/neon/pull/11585 aimed to run unit tests for all `virtual_file_io_mode` combinations but didn't because of a missing `_` in the env var. # Performance This section assesses this PR's impact on deployments with current production setting (`=direct`) and anticipated impact of switching to (`=direct-rw`). For `DeltaLayerWriter`, `=direct` should remain unchanged to slightly improved on throughput because the `BlobWriter`'s buffer had the same size as the `BufferedWriter`'s buffer, but it didn't have the double-buffering that `BufferedWriter` has. The `=direct-rw` enables direct IO; throughput should not be suffering because of double-buffering; benchmarks will show if this is true. The `ImageLayerWriter` was previously not doing any buffering (`BUFFERED=false`). It went straight to issuing the IO operation to the underlying VirtualFile and the buffering was done by the kernel. The switch to `BufferedWriter` under `=direct` adds an additional memcpy into the BufferedWriter's buffer. We will win back that memcpy when enabling direct IO via `=direct-rw`. A nice win from the switch to `BufferedWriter` is that ImageLayerWriter performs >=16x fewer write operations to VirtualFile (the BlobWriter performs one write per len field and one write per image value). This should save low tens of microseconds of CPU overhead from doing all these syscalls/io_uring operations, regardless of `=direct` or `=direct-rw`. Aside from problems with alignment, this write frequency without double-buffering is prohibitive if we actually have to wait for the disk, which is what will happen when we enable direct IO via (`=direct-rw`). Throughput should not be suffering because of BufferedWrite's double-buffering; benchmarks will show if this is true. `InMemoryLayer` at `=direct` will flip back to using buffered IO but remain on BufferedWriter. The buffered IO adds back one memcpy of CPU overhead. Throughput should not suffer and will might improve on not-memory-pressured Pageservers but let's remember that we're doing the whole direct IO thing to eliminate global memory pressure as a source of perf variability. ## bench_ingest I reran `bench_ingest` on `im4gn.2xlarge` and `Hetzner AX102`. Use `git diff` with `--word-diff` or similar to see the change. General guidance on interpretation: - immediate production impact of this PR without production config change can be gauged by comparing the same `io_mode=Direct` - end state of production switched over to `io_mode=DirectRw` can be gauged by comparing old results' `io_mode=Direct` to new results' `io_mode=DirectRw` Given above guidance, on `im4gn.2xlarge` - immediate impact is a significant improvement in all cases - end state after switching has same significant improvements in all cases - ... except `ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes` which only achieves `238 MiB/s` instead of `253.43 MiB/s` - this is a 6% degradation - this workload is typical for image layer creation # Refs - epic https://github.com/neondatabase/neon/issues/9868 - stacked atop - preliminary refactor https://github.com/neondatabase/neon/pull/11549 - bench_ingest overhaul https://github.com/neondatabase/neon/pull/11667 - derived from https://github.com/neondatabase/neon/pull/10063 Co-authored-by: Yuchen Liang <yuchen@neon.tech>
389 lines
13 KiB
Rust
389 lines
13 KiB
Rust
//! [`super::VirtualFile`] supports different IO engines.
|
|
//!
|
|
//! The [`IoEngineKind`] enum identifies them.
|
|
//!
|
|
//! The choice of IO engine is global.
|
|
//! Initialize using [`init`].
|
|
//!
|
|
//! Then use [`get`] and [`super::OpenOptions`].
|
|
//!
|
|
//!
|
|
|
|
#[cfg(target_os = "linux")]
|
|
pub(super) mod tokio_epoll_uring_ext;
|
|
|
|
use tokio_epoll_uring::IoBuf;
|
|
use tracing::Instrument;
|
|
|
|
pub(crate) use super::api::IoEngineKind;
|
|
#[derive(Clone, Copy)]
|
|
#[repr(u8)]
|
|
pub(crate) enum IoEngine {
|
|
NotSet,
|
|
StdFs,
|
|
#[cfg(target_os = "linux")]
|
|
TokioEpollUring,
|
|
}
|
|
|
|
impl From<IoEngineKind> for IoEngine {
|
|
fn from(value: IoEngineKind) -> Self {
|
|
match value {
|
|
IoEngineKind::StdFs => IoEngine::StdFs,
|
|
#[cfg(target_os = "linux")]
|
|
IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TryFrom<u8> for IoEngine {
|
|
type Error = u8;
|
|
|
|
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
|
Ok(match value {
|
|
v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
|
|
v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
|
|
#[cfg(target_os = "linux")]
|
|
v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
|
|
x => return Err(x),
|
|
})
|
|
}
|
|
}
|
|
|
|
static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
|
|
|
|
pub(crate) fn set(engine_kind: IoEngineKind) {
|
|
let engine: IoEngine = engine_kind.into();
|
|
IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
|
|
#[cfg(not(test))]
|
|
{
|
|
let metric = &crate::metrics::virtual_file_io_engine::KIND;
|
|
metric.reset();
|
|
metric
|
|
.with_label_values(&[&format!("{engine_kind}")])
|
|
.set(1);
|
|
}
|
|
}
|
|
|
|
#[cfg(not(test))]
|
|
pub(super) fn init(engine_kind: IoEngineKind) {
|
|
set(engine_kind);
|
|
}
|
|
|
|
/// Longer-term, this API should only be used by [`super::VirtualFile`].
|
|
pub(crate) fn get() -> IoEngine {
|
|
let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
|
|
if cfg!(test) {
|
|
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
|
match cur {
|
|
IoEngine::NotSet => {
|
|
let kind = match std::env::var(env_var_name) {
|
|
Ok(v) => match v.parse::<IoEngineKind>() {
|
|
Ok(engine_kind) => engine_kind,
|
|
Err(e) => {
|
|
panic!(
|
|
"invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}"
|
|
)
|
|
}
|
|
},
|
|
Err(std::env::VarError::NotPresent) => {
|
|
#[cfg(target_os = "linux")]
|
|
{
|
|
IoEngineKind::TokioEpollUring
|
|
}
|
|
#[cfg(not(target_os = "linux"))]
|
|
{
|
|
IoEngineKind::StdFs
|
|
}
|
|
}
|
|
Err(std::env::VarError::NotUnicode(_)) => {
|
|
panic!("env var {env_var_name} is not unicode");
|
|
}
|
|
};
|
|
self::set(kind);
|
|
self::get()
|
|
}
|
|
x => x,
|
|
}
|
|
} else {
|
|
cur
|
|
}
|
|
}
|
|
|
|
use std::os::unix::prelude::FileExt;
|
|
use std::sync::atomic::{AtomicU8, Ordering};
|
|
|
|
use super::owned_buffers_io::io_buf_ext::FullSlice;
|
|
use super::owned_buffers_io::slice::SliceMutExt;
|
|
use super::{FileGuard, Metadata};
|
|
|
|
#[cfg(target_os = "linux")]
|
|
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
|
|
match e {
|
|
tokio_epoll_uring::Error::Op(e) => e,
|
|
tokio_epoll_uring::Error::System(system) => {
|
|
std::io::Error::new(std::io::ErrorKind::Other, system)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl IoEngine {
|
|
pub(super) async fn read_at<Buf>(
|
|
&self,
|
|
file_guard: FileGuard,
|
|
offset: u64,
|
|
mut slice: tokio_epoll_uring::Slice<Buf>,
|
|
) -> (
|
|
(FileGuard, tokio_epoll_uring::Slice<Buf>),
|
|
std::io::Result<usize>,
|
|
)
|
|
where
|
|
Buf: tokio_epoll_uring::IoBufMut + Send,
|
|
{
|
|
match self {
|
|
IoEngine::NotSet => panic!("not initialized"),
|
|
IoEngine::StdFs => {
|
|
let rust_slice = slice.as_mut_rust_slice_full_zeroed();
|
|
let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
|
|
((file_guard, slice), res)
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
IoEngine::TokioEpollUring => {
|
|
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
|
let (resources, res) = system.read(file_guard, offset, slice).await;
|
|
(resources, res.map_err(epoll_uring_error_to_std))
|
|
}
|
|
}
|
|
}
|
|
pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
|
|
match self {
|
|
IoEngine::NotSet => panic!("not initialized"),
|
|
IoEngine::StdFs => {
|
|
let res = file_guard.with_std_file(|std_file| std_file.sync_all());
|
|
(file_guard, res)
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
IoEngine::TokioEpollUring => {
|
|
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
|
let (resources, res) = system.fsync(file_guard).await;
|
|
(resources, res.map_err(epoll_uring_error_to_std))
|
|
}
|
|
}
|
|
}
|
|
pub(super) async fn sync_data(
|
|
&self,
|
|
file_guard: FileGuard,
|
|
) -> (FileGuard, std::io::Result<()>) {
|
|
match self {
|
|
IoEngine::NotSet => panic!("not initialized"),
|
|
IoEngine::StdFs => {
|
|
let res = file_guard.with_std_file(|std_file| std_file.sync_data());
|
|
(file_guard, res)
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
IoEngine::TokioEpollUring => {
|
|
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
|
let (resources, res) = system.fdatasync(file_guard).await;
|
|
(resources, res.map_err(epoll_uring_error_to_std))
|
|
}
|
|
}
|
|
}
|
|
pub(super) async fn metadata(
|
|
&self,
|
|
file_guard: FileGuard,
|
|
) -> (FileGuard, std::io::Result<Metadata>) {
|
|
match self {
|
|
IoEngine::NotSet => panic!("not initialized"),
|
|
IoEngine::StdFs => {
|
|
let res =
|
|
file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
|
|
(file_guard, res)
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
IoEngine::TokioEpollUring => {
|
|
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
|
let (resources, res) = system.statx(file_guard).await;
|
|
(
|
|
resources,
|
|
res.map_err(epoll_uring_error_to_std).map(Metadata::from),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) async fn set_len(
|
|
&self,
|
|
file_guard: FileGuard,
|
|
len: u64,
|
|
) -> (FileGuard, std::io::Result<()>) {
|
|
match self {
|
|
IoEngine::NotSet => panic!("not initialized"),
|
|
IoEngine::StdFs => {
|
|
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
|
(file_guard, res)
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
IoEngine::TokioEpollUring => {
|
|
// TODO: ftruncate op for tokio-epoll-uring
|
|
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
|
(file_guard, res)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) async fn write_at<B: IoBuf + Send>(
|
|
&self,
|
|
file_guard: FileGuard,
|
|
offset: u64,
|
|
buf: FullSlice<B>,
|
|
) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
|
|
match self {
|
|
IoEngine::NotSet => panic!("not initialized"),
|
|
IoEngine::StdFs => {
|
|
let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
|
|
((file_guard, buf), result)
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
IoEngine::TokioEpollUring => {
|
|
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
|
let ((file_guard, slice), res) =
|
|
system.write(file_guard, offset, buf.into_raw_slice()).await;
|
|
(
|
|
(file_guard, FullSlice::must_new(slice)),
|
|
res.map_err(epoll_uring_error_to_std),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
|
|
/// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
|
|
/// whereas before the switch to [`super::io_engine`], that wasn't the case.
|
|
/// This method helps avoid such a regression.
|
|
///
|
|
/// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
|
|
pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
|
|
where
|
|
Fut: 'static + Send + std::future::Future<Output = R>,
|
|
R: 'static + Send,
|
|
{
|
|
match self {
|
|
IoEngine::NotSet => panic!("not initialized"),
|
|
IoEngine::StdFs => {
|
|
let span = tracing::info_span!("spawn_blocking_block_on_if_std");
|
|
tokio::task::spawn_blocking({
|
|
move || tokio::runtime::Handle::current().block_on(work.instrument(span))
|
|
})
|
|
.await
|
|
.expect("failed to join blocking code most likely it panicked, panicking as well")
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
IoEngine::TokioEpollUring => work.await,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub enum FeatureTestResult {
|
|
PlatformPreferred(IoEngineKind),
|
|
Worse {
|
|
engine: IoEngineKind,
|
|
remark: String,
|
|
},
|
|
}
|
|
|
|
impl FeatureTestResult {
|
|
#[cfg(target_os = "linux")]
|
|
const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
|
|
#[cfg(not(target_os = "linux"))]
|
|
const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
|
|
}
|
|
|
|
impl From<FeatureTestResult> for IoEngineKind {
|
|
fn from(val: FeatureTestResult) -> Self {
|
|
match val {
|
|
FeatureTestResult::PlatformPreferred(e) => e,
|
|
FeatureTestResult::Worse { engine, .. } => engine,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Somewhat costly under the hood, do only once.
|
|
/// Panics if we can't set up the feature test.
|
|
pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
|
|
std::thread::spawn(|| {
|
|
|
|
#[cfg(not(target_os = "linux"))]
|
|
{
|
|
Ok(FeatureTestResult::PlatformPreferred(
|
|
FeatureTestResult::PLATFORM_PREFERRED,
|
|
))
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
{
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.unwrap();
|
|
Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
|
|
Ok(_) => FeatureTestResult::PlatformPreferred({
|
|
assert!(matches!(
|
|
IoEngineKind::TokioEpollUring,
|
|
FeatureTestResult::PLATFORM_PREFERRED
|
|
));
|
|
FeatureTestResult::PLATFORM_PREFERRED
|
|
}),
|
|
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
|
|
let remark = match e.raw_os_error() {
|
|
Some(nix::libc::EPERM) => {
|
|
// fall back
|
|
"creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
|
|
.to_string()
|
|
}
|
|
Some(nix::libc::EFAULT) => {
|
|
// fail feature test
|
|
anyhow::bail!(
|
|
"creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
|
|
);
|
|
}
|
|
Some(_) | None => {
|
|
// fall back
|
|
format!("creating tokio-epoll-uring fails with error: {e:#}")
|
|
}
|
|
};
|
|
FeatureTestResult::Worse {
|
|
engine: IoEngineKind::StdFs,
|
|
remark,
|
|
}
|
|
}
|
|
})
|
|
}
|
|
})
|
|
.join()
|
|
.unwrap()
|
|
}
|
|
|
|
/// For use in benchmark binaries only.
|
|
///
|
|
/// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
|
|
/// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
|
|
/// developer time trying to figure out why it's slow.
|
|
///
|
|
/// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
|
|
pub fn io_engine_for_bench() -> IoEngineKind {
|
|
#[cfg(not(target_os = "linux"))]
|
|
{
|
|
panic!("This benchmark does I/O and can only give a representative result on Linux");
|
|
}
|
|
#[cfg(target_os = "linux")]
|
|
{
|
|
match feature_test().unwrap() {
|
|
FeatureTestResult::PlatformPreferred(engine) => engine,
|
|
FeatureTestResult::Worse {
|
|
engine: _engine,
|
|
remark,
|
|
} => {
|
|
panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
|
|
}
|
|
}
|
|
}
|
|
}
|