mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
8 Commits
sk-patch-c
...
problame/i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ccd2de94cc | ||
|
|
c9d0c67d1e | ||
|
|
29daa2b633 | ||
|
|
9c9744ba4b | ||
|
|
21fd448722 | ||
|
|
33c56a383c | ||
|
|
5c53e04fdf | ||
|
|
5b2199f477 |
8
.github/workflows/build_and_test.yml
vendored
8
.github/workflows/build_and_test.yml
vendored
@@ -341,7 +341,9 @@ jobs:
|
||||
|
||||
- name: Run rust tests
|
||||
run: |
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
@@ -426,6 +428,7 @@ jobs:
|
||||
matrix:
|
||||
build_type: [ debug, release ]
|
||||
pg_version: [ v14, v15, v16 ]
|
||||
pageserver_virtual_file_io_engine: [ std-fs, tokio-epoll-uring ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
@@ -448,6 +451,7 @@ jobs:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: ${{ matrix.pageserver_virtual_file_io_engine }}
|
||||
|
||||
- name: Merge and upload coverage data
|
||||
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
|
||||
@@ -466,6 +470,7 @@ jobs:
|
||||
matrix:
|
||||
pytest_split_group: [ 1, 2, 3, 4 ]
|
||||
build_type: [ release ]
|
||||
pageserver_virtual_file_io_engine: [ std-fs, tokio-epoll-uring ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
@@ -482,6 +487,7 @@ jobs:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: "${{ matrix.pageserver_virtual_file_io_engine }}"
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
|
||||
43
Cargo.lock
generated
43
Cargo.lock
generated
@@ -2520,6 +2520,16 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.9.0"
|
||||
@@ -3315,6 +3325,7 @@ dependencies = [
|
||||
"tenant_size_model",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
@@ -5334,18 +5345,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
|
||||
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
|
||||
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -5469,6 +5480,21 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"once_cell",
|
||||
"scopeguard",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"uring-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-io-timeout"
|
||||
version = "1.2.0"
|
||||
@@ -6020,6 +6046,15 @@ dependencies = [
|
||||
"webpki-roots 0.23.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68"
|
||||
dependencies = [
|
||||
"io-uring",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "url"
|
||||
version = "2.3.1"
|
||||
|
||||
@@ -150,6 +150,8 @@ test-context = "0.1"
|
||||
thiserror = "1.0"
|
||||
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
#tokio-epoll-uring = { path = "../tokio-epoll-uring/tokio-epoll-uring" }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.10.0"
|
||||
tokio-rustls = "0.24"
|
||||
|
||||
@@ -61,6 +61,7 @@ sync_wrapper.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
tokio-epoll-uring.workspace = true
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
|
||||
@@ -18,7 +18,7 @@ use pageserver::tenant::block_io::FileBlockReader;
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
|
||||
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
|
||||
use pageserver::tenant::storage_layer::range_overlaps;
|
||||
use pageserver::virtual_file::VirtualFile;
|
||||
use pageserver::virtual_file::{self, VirtualFile};
|
||||
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
|
||||
@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
virtual_file::init(10);
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path).await?);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
|
||||
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
|
||||
@@ -130,7 +130,7 @@ fn main() -> anyhow::Result<()> {
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{
|
||||
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
use crate::virtual_file;
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
|
||||
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
@@ -43,6 +44,8 @@ use crate::{
|
||||
|
||||
use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
|
||||
|
||||
use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
|
||||
|
||||
pub mod defaults {
|
||||
use crate::tenant::config::defaults::*;
|
||||
use const_format::formatcp;
|
||||
@@ -79,6 +82,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
///
|
||||
@@ -114,6 +119,8 @@ pub mod defaults {
|
||||
|
||||
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
|
||||
|
||||
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
|
||||
|
||||
[tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
@@ -247,6 +254,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// Maximum number of WAL records to be ingested and committed at the same time
|
||||
pub ingest_batch_size: u64,
|
||||
|
||||
pub virtual_file_io_engine: virtual_file::IoEngineKind,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -331,6 +340,8 @@ struct PageServerConfigBuilder {
|
||||
secondary_download_concurrency: BuilderValue<usize>,
|
||||
|
||||
ingest_batch_size: BuilderValue<u64>,
|
||||
|
||||
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -406,6 +417,8 @@ impl Default for PageServerConfigBuilder {
|
||||
secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
|
||||
|
||||
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
|
||||
|
||||
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -562,6 +575,10 @@ impl PageServerConfigBuilder {
|
||||
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
|
||||
}
|
||||
|
||||
pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
|
||||
self.virtual_file_io_engine = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_warmup = self
|
||||
.concurrent_tenant_warmup
|
||||
@@ -669,6 +686,9 @@ impl PageServerConfigBuilder {
|
||||
ingest_batch_size: self
|
||||
.ingest_batch_size
|
||||
.ok_or(anyhow!("missing ingest_batch_size"))?,
|
||||
virtual_file_io_engine: self
|
||||
.virtual_file_io_engine
|
||||
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -920,6 +940,9 @@ impl PageServerConf {
|
||||
builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
|
||||
},
|
||||
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
|
||||
"virtual_file_io_engine" => {
|
||||
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
|
||||
}
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -993,6 +1016,7 @@ impl PageServerConf {
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1224,6 +1248,7 @@ background_task_maximum_delay = '334 s'
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1287,6 +1312,7 @@ background_task_maximum_delay = '334 s'
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: 100,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![recursion_limit = "300"]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
mod auth;
|
||||
|
||||
@@ -941,6 +941,7 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) mod virtual_file_descriptor_cache {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -5,10 +5,10 @@
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::ops::Deref;
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
/// blocks, using the page cache
|
||||
@@ -39,6 +39,8 @@ pub enum BlockLease<'a> {
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
#[cfg(test)]
|
||||
Vec(Vec<u8>),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
@@ -63,6 +65,13 @@ impl<'a> Deref for BlockLease<'a> {
|
||||
BlockLease::EphemeralFileMutableTail(v) => v,
|
||||
#[cfg(test)]
|
||||
BlockLease::Arc(v) => v.deref(),
|
||||
#[cfg(test)]
|
||||
BlockLease::Vec(v) => {
|
||||
let v: &Vec<u8> = v;
|
||||
assert_eq!(v.len(), PAGE_SZ, "caller must ensure that v has PAGE_SZ");
|
||||
// Safety: see above assertion.
|
||||
unsafe { &*(v.as_ptr() as *const [u8; PAGE_SZ]) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -169,10 +178,14 @@ impl FileBlockReader {
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||
async fn fill_buffer(
|
||||
&self,
|
||||
buf: PageWriteGuard<'static>,
|
||||
blkno: u32,
|
||||
) -> Result<PageWriteGuard<'static>, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
}
|
||||
/// Read a block.
|
||||
@@ -196,9 +209,9 @@ impl FileBlockReader {
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
ReadBufResult::NotFound(write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
let write_guard = self.fill_buffer(write_guard, blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::cmp::min;
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
@@ -47,7 +47,10 @@ impl EphemeralFile {
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
OpenOptions::new().read(true).write(true).create(true),
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -89,11 +92,10 @@ impl EphemeralFile {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = self
|
||||
.file
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
@@ -649,7 +649,7 @@ impl DeltaLayer {
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
@@ -327,7 +327,7 @@ impl ImageLayer {
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
@@ -492,11 +492,15 @@ impl ImageLayerWriterInner {
|
||||
},
|
||||
);
|
||||
info!("new image layer {path}");
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
&path,
|
||||
std::fs::OpenOptions::new().write(true).create_new(true),
|
||||
)
|
||||
.await?;
|
||||
let mut file = {
|
||||
VirtualFile::open_with_options(
|
||||
&path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
|
||||
@@ -11,17 +11,27 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
mod io_engine;
|
||||
mod open_options;
|
||||
pub use io_engine::IoEngineKind;
|
||||
pub(crate) use open_options::*;
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
/// the underlying file is closed if the system is low on file descriptors,
|
||||
@@ -104,7 +114,38 @@ struct SlotInner {
|
||||
tag: u64,
|
||||
|
||||
/// the underlying file
|
||||
file: Option<File>,
|
||||
file: Option<OwnedFd>,
|
||||
}
|
||||
|
||||
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
|
||||
struct PageWriteGuardBuf {
|
||||
page: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
|
||||
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.page.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.page.len()
|
||||
}
|
||||
}
|
||||
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
|
||||
// hence it's safe to hand out the `stable_mut_ptr()`.
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.page.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.page.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
@@ -272,6 +313,10 @@ macro_rules! with_file {
|
||||
let $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
|
||||
let mut $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
@@ -315,7 +360,9 @@ impl VirtualFile {
|
||||
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
|
||||
// where our caller doesn't get to use the returned VirtualFile before its
|
||||
// slot gets re-used by someone else.
|
||||
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
|
||||
let file = observe_duration!(StorageIoOperation::Open, {
|
||||
open_options.open(path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
@@ -388,15 +435,13 @@ impl VirtualFile {
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::Fsync, |file| file
|
||||
.as_ref()
|
||||
.sync_all())
|
||||
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.sync_all()))
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
with_file!(self, StorageIoOperation::Metadata, |file| file
|
||||
.as_ref()
|
||||
.metadata())
|
||||
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.metadata()))
|
||||
}
|
||||
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
@@ -405,7 +450,7 @@ impl VirtualFile {
|
||||
///
|
||||
/// We are doing it via a macro as Rust doesn't support async closures that
|
||||
/// take on parameters with lifetimes.
|
||||
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
|
||||
async fn lock_file(&self) -> Result<FileGuard, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -451,10 +496,9 @@ impl VirtualFile {
|
||||
// NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
|
||||
// case from StorageIoOperation::Open. This helps with identifying thrashing
|
||||
// of the virtual file descriptor cache.
|
||||
let file = observe_duration!(
|
||||
StorageIoOperation::OpenAfterReplace,
|
||||
self.open_options.open(&self.path)
|
||||
)?;
|
||||
let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
|
||||
self.open_options.open(self.path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
@@ -479,9 +523,8 @@ impl VirtualFile {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
|
||||
.as_ref()
|
||||
.seek(SeekFrom::End(offset)))?
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
|
||||
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
@@ -501,24 +544,48 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
}
|
||||
pub async fn read_exact_at<B>(&self, buf: B, mut offset: u64) -> Result<B, Error>
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
{
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full();
|
||||
while buf.bytes_total() != 0 {
|
||||
let res;
|
||||
(buf, res) = self.read_at(buf, offset).await;
|
||||
match res {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
buf = buf.slice(n..);
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
if !buf.is_empty() {
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
} else {
|
||||
Ok(buf.into_inner())
|
||||
}
|
||||
}
|
||||
|
||||
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
|
||||
pub async fn read_exact_at_page(
|
||||
&self,
|
||||
page: PageWriteGuard<'static>,
|
||||
offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let buf = PageWriteGuardBuf {
|
||||
page,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let res = self.read_exact_at(buf, offset).await;
|
||||
res.map(|PageWriteGuardBuf { page, .. }| page)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
@@ -568,22 +635,30 @@ impl VirtualFile {
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Read, |file| file
|
||||
.as_ref()
|
||||
.read_at(buf, offset));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
|
||||
where
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
{
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
if let Ok(size) = res {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
(buf, res)
|
||||
})
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file| file
|
||||
.as_ref()
|
||||
.write_at(buf, offset));
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
|
||||
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
|
||||
});
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
|
||||
@@ -593,18 +668,54 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard<'a> {
|
||||
slot_guard: RwLockReadGuard<'a, SlotInner>,
|
||||
struct FileGuard {
|
||||
slot_guard: RwLockReadGuard<'static, SlotInner>,
|
||||
}
|
||||
|
||||
impl<'a> AsRef<File> for FileGuard<'a> {
|
||||
fn as_ref(&self) -> &File {
|
||||
impl AsRef<OwnedFd> for FileGuard {
|
||||
fn as_ref(&self) -> &OwnedFd {
|
||||
// This unwrap is safe because we only create `FileGuard`s
|
||||
// if we know that the file is Some.
|
||||
self.slot_guard.file.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileGuard {
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file<F, R>(&self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
|
||||
let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
|
||||
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&mut file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
unsafe fn as_fd(&self) -> RawFd {
|
||||
let owned_fd: &OwnedFd = self.as_ref();
|
||||
owned_fd.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -612,16 +723,19 @@ impl VirtualFile {
|
||||
blknum: u32,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let mut buf = [0; PAGE_SZ];
|
||||
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
let buf = vec![0; PAGE_SZ];
|
||||
let buf = self
|
||||
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
.await?;
|
||||
Ok(std::sync::Arc::new(buf).into())
|
||||
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
|
||||
let mut tmp = vec![0; 128];
|
||||
loop {
|
||||
let mut tmp = [0; 128];
|
||||
match self.read_at(&mut tmp, self.pos).await {
|
||||
let res;
|
||||
(tmp, res) = self.read_at(tmp, self.pos).await;
|
||||
match res {
|
||||
Ok(0) => return Ok(()),
|
||||
Ok(n) => {
|
||||
self.pos += n as u64;
|
||||
@@ -697,10 +811,12 @@ impl OpenFiles {
|
||||
/// Initialize the virtual file module. This must be called once at page
|
||||
/// server startup.
|
||||
///
|
||||
pub fn init(num_slots: usize) {
|
||||
#[cfg(not(test))]
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
io_engine::init(engine);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
|
||||
@@ -745,10 +861,10 @@ mod tests {
|
||||
}
|
||||
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
|
||||
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
|
||||
}
|
||||
}
|
||||
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||
@@ -790,14 +906,14 @@ mod tests {
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
|
||||
let mut buf = vec![0; len];
|
||||
self.read_exact_at(&mut buf, pos).await?;
|
||||
let buf = vec![0; len];
|
||||
let buf = self.read_exact_at(buf, pos).await?;
|
||||
Ok(String::from_utf8(buf).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_virtual_files() -> Result<(), Error> {
|
||||
async fn test_virtual_files() -> anyhow::Result<()> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
@@ -813,14 +929,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_physical_files() -> Result<(), Error> {
|
||||
async fn test_physical_files() -> anyhow::Result<()> {
|
||||
test_files("physical_files", |path, open_options| async move {
|
||||
Ok(MaybeVirtualFile::File(open_options.open(path)?))
|
||||
Ok(MaybeVirtualFile::File({
|
||||
let owned_fd = open_options.open(path.as_std_path()).await?;
|
||||
File::from(owned_fd)
|
||||
}))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
|
||||
where
|
||||
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
|
||||
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
|
||||
@@ -964,11 +1083,11 @@ mod tests {
|
||||
for _threadno in 0..THREADS {
|
||||
let files = files.clone();
|
||||
let hdl = rt.spawn(async move {
|
||||
let mut buf = [0u8; SIZE];
|
||||
let mut buf = vec![0u8; SIZE];
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
f.read_exact_at(&mut buf, 0).await.unwrap();
|
||||
buf = f.read_exact_at(buf, 0).await.unwrap();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
});
|
||||
|
||||
109
pageserver/src/virtual_file/io_engine.rs
Normal file
109
pageserver/src/virtual_file/io_engine.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
//! [`super::VirtualFile`] supports different IO engines.
|
||||
//!
|
||||
//! The [`IoEngineKind`] enum identifies them.
|
||||
//!
|
||||
//! The choice of IO engine is global.
|
||||
//! Initialize using [`init`].
|
||||
//!
|
||||
//! Then use [`get`] and [`OpenOptions`].
|
||||
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Hash,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum IoEngineKind {
|
||||
StdFs,
|
||||
TokioEpollUring,
|
||||
}
|
||||
|
||||
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(super) fn init(engine: IoEngineKind) {
|
||||
if IO_ENGINE.set(engine).is_err() {
|
||||
panic!("called twice");
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn get() -> &'static IoEngineKind {
|
||||
#[cfg(test)]
|
||||
{
|
||||
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
||||
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
|
||||
Ok(v) => match v.parse::<IoEngineKind>() {
|
||||
Ok(engine_kind) => engine_kind,
|
||||
Err(e) => {
|
||||
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
|
||||
}
|
||||
},
|
||||
Err(std::env::VarError::NotPresent) => {
|
||||
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {env_var_name} is not unicode");
|
||||
}
|
||||
})
|
||||
}
|
||||
#[cfg(not(test))]
|
||||
IO_ENGINE.get().unwrap()
|
||||
}
|
||||
|
||||
use std::os::unix::prelude::FileExt;
|
||||
|
||||
use super::FileGuard;
|
||||
|
||||
impl IoEngineKind {
|
||||
pub(super) async fn read_at<B>(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
offset: u64,
|
||||
mut buf: B,
|
||||
) -> ((FileGuard, B), std::io::Result<usize>)
|
||||
where
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
{
|
||||
match self {
|
||||
IoEngineKind::StdFs => {
|
||||
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
|
||||
let dst = unsafe {
|
||||
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
|
||||
};
|
||||
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
|
||||
if let Ok(nbytes) = &res {
|
||||
assert!(*nbytes <= buf.bytes_total());
|
||||
// SAFETY: see above assertion
|
||||
unsafe {
|
||||
buf.set_init(*nbytes);
|
||||
}
|
||||
}
|
||||
#[allow(dropping_references)]
|
||||
drop(dst);
|
||||
((file_guard, buf), res)
|
||||
}
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, buf).await;
|
||||
(
|
||||
resources,
|
||||
res.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
129
pageserver/src/virtual_file/open_options.rs
Normal file
129
pageserver/src/virtual_file/open_options.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use std::{os::fd::OwnedFd, path::Path};
|
||||
|
||||
use super::IoEngineKind;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OpenOptions {
|
||||
StdFs(std::fs::OpenOptions),
|
||||
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
|
||||
}
|
||||
|
||||
impl Default for OpenOptions {
|
||||
fn default() -> Self {
|
||||
match super::io_engine::get() {
|
||||
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenOptions {
|
||||
pub fn new() -> OpenOptions {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
system.open(path, x).await.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -440,6 +440,7 @@ class NeonEnvBuilder:
|
||||
preserve_database_files: bool = False,
|
||||
initial_tenant: Optional[TenantId] = None,
|
||||
initial_timeline: Optional[TimelineId] = None,
|
||||
pageserver_virtual_file_io_engine: Optional[str] = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -473,6 +474,8 @@ class NeonEnvBuilder:
|
||||
self.test_overlay_dir = test_overlay_dir
|
||||
self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = []
|
||||
|
||||
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
@@ -870,6 +873,8 @@ class NeonEnv:
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: NeonAttachmentService = NeonAttachmentService(self)
|
||||
|
||||
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
cfg: Dict[str, Any] = {
|
||||
"default_tenant_id": str(self.initial_tenant),
|
||||
@@ -901,6 +906,9 @@ class NeonEnv:
|
||||
"pg_auth_type": pg_auth_type,
|
||||
"http_auth_type": http_auth_type,
|
||||
}
|
||||
if self.pageserver_virtual_file_io_engine is not None:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(
|
||||
@@ -1040,6 +1048,7 @@ def _shared_simple_env(
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
|
||||
@@ -1068,6 +1077,7 @@ def _shared_simple_env(
|
||||
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
@@ -1105,6 +1115,7 @@ def neon_env_builder(
|
||||
run_id: uuid.UUID,
|
||||
request: FixtureRequest,
|
||||
test_overlay_dir: Path,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
) -> Iterator[NeonEnvBuilder]:
|
||||
"""
|
||||
Fixture to create a Neon environment for test.
|
||||
@@ -1133,6 +1144,7 @@ def neon_env_builder(
|
||||
broker=default_broker,
|
||||
run_id=run_id,
|
||||
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
test_overlay_dir=test_overlay_dir,
|
||||
|
||||
@@ -8,7 +8,7 @@ from _pytest.python import Metafunc
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
"""
|
||||
Dynamically parametrize tests by Postgres version and build type (debug/release/remote)
|
||||
Dynamically parametrize tests by Postgres version, build type (debug/release/remote), and possibly by other parameters
|
||||
"""
|
||||
|
||||
|
||||
@@ -31,11 +31,12 @@ def build_type(request: FixtureRequest) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: Metafunc):
|
||||
# Do not parametrize performance tests yet, we need to prepare grafana charts first
|
||||
if "test_runner/performance" in metafunc.definition._nodeid:
|
||||
return
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_virtual_file_io_engine(request: FixtureRequest) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: Metafunc):
|
||||
if (v := os.environ.get("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
@@ -46,5 +47,14 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
# A hacky way to parametrize performance tests only for `pageserver_virtual_file_io_engine=tokio-epoll-uring`
|
||||
# And do not change test name for default `pageserver_virtual_file_io_engine=std-fs` to keep perf tests statistics
|
||||
if (io_engine := os.environ.get("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
|
||||
metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine])
|
||||
|
||||
# Do not parametrize performance tests yet by Postgres version or build type, we need to prepare grafana charts first
|
||||
if "test_runner/performance" in metafunc.definition._nodeid:
|
||||
return
|
||||
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
|
||||
Reference in New Issue
Block a user