Compare commits

...

29 Commits

Author SHA1 Message Date
Yuchen Liang
e565c4fbe9 Merge branch 'yuchen/direct-io-aligned-alloc' into yuchen/direct-io-aligned-alloc-usage-wip 2024-10-07 13:08:55 -04:00
Yuchen Liang
a46757b769 add with_capacity_aligned_zeroed and leak
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-07 13:06:00 -04:00
Yuchen Liang
9c32bfee3b fix put_io_mode to use the correct http endpoint
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 16:47:36 -04:00
Yuchen Liang
69ef8caf58 simplify virtual file wrapper
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:31:55 +00:00
Yuchen Liang
b7443dd643 add set_io_mode option to getpage_latest_lsn
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:16:42 +00:00
Yuchen Liang
cc433c76a3 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:05:01 +00:00
Yuchen Liang
2034ec906a remove unused imports
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:04:11 +00:00
Yuchen Liang
f48ab8bcaa use O_DIRECT as preferred
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 03:57:58 +00:00
Yuchen Liang
2607a57990 pageserver: add direct io config to virtual file
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 03:57:13 +00:00
Yuchen Liang
f04c1c230c incr len in with_capacity_aligned_zeroed add a test
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 23:47:00 +00:00
Yuchen Liang
13f1931a09 fix build
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:18:29 +00:00
Yuchen Liang
e98a4eb5e2 add safety comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
e01d145066 remove example; add with_capacity_aligned_zeroed
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
9e9d76d6f2 use IoBufferMut for pagecache
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
14ec379d2b enable O_DIRECT for delta and image layers
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
ebfe88a463 use IoBufferMut for delta and image layers
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
eb16aa9e81 Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-09-29 16:59:27 -04:00
Yuchen Liang
f6d0ed6454 implement reserve for IoBufferMut
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-26 14:12:49 +00:00
Yuchen Liang
a2be8a440b Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-09-24 21:29:33 -04:00
Yuchen Liang
ff4a1db223 make sure we can Send IoBufferMut
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-03 23:52:34 -04:00
Yuchen Liang
29d54ccd20 Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-09-03 11:41:49 -04:00
Yuchen Liang
68a1fe20f2 review: use doc comments to reference struct in safety comment
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-09-03 11:41:02 -04:00
Yuchen Liang
e8408c797a remove unused comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-03 11:38:58 -04:00
Yuchen Liang
027f28deb9 remove Vec dependency
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-03 11:38:00 -04:00
Yuchen Liang
ea6f9798c6 add safety comment
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-16 18:06:16 +00:00
Yuchen Liang
253e4d5843 Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-08-16 13:14:20 -04:00
Yuchen Liang
852099bc83 remove aligned-vec, use ManuallyDrop<Vec<u8>>
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-16 17:13:30 +00:00
Yuchen Liang
148e230d11 add mut version marker trait
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-15 04:03:13 +00:00
Yuchen Liang
6d664788c1 feat(pageserver): newtype aligned-vec as aligned buffer allocation
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-15 03:43:58 +00:00
22 changed files with 797 additions and 131 deletions

View File

@@ -104,7 +104,7 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
pub io_buffer_alignment: usize,
}
@@ -381,7 +381,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
virtual_file_io_mode: None,
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,

View File

@@ -972,8 +972,6 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::path::PathBuf;
#[derive(
Copy,
Clone,
@@ -994,50 +992,49 @@ pub mod virtual_file {
}
/// Direct IO modes for a pageserver.
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum DirectIoMode {
/// Direct IO disabled (uses usual buffered IO).
#[default]
Disabled,
/// Direct IO disabled (performs checks and perf simulations).
Evaluate {
/// Alignment check level
alignment_check: DirectIoAlignmentCheckLevel,
/// Latency padded for performance simulation.
latency_padding: DirectIoLatencyPadding,
},
/// Direct IO enabled.
Enabled {
/// Actions to perform on alignment error.
on_alignment_error: DirectIoOnAlignmentErrorAction,
},
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO, error out if the operation fails.
#[cfg(target_os = "linux")]
Direct,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoAlignmentCheckLevel {
#[default]
Error,
Log,
None,
impl IoMode {
pub const fn preferred() -> Self {
if cfg!(target_os = "linux") {
Self::Direct
} else {
Self::Buffered
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoOnAlignmentErrorAction {
Error,
#[default]
FallbackToBuffered,
}
impl TryFrom<u8> for IoMode {
type Error = u8;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum DirectIoLatencyPadding {
/// Pad virtual file operations with IO to a fake file.
FakeFileRW { path: PathBuf },
#[default]
None,
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
x => return Err(x),
})
}
}
}

View File

@@ -164,12 +164,10 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(conf.page_cache_size);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(16384, virtual_file::io_engine_for_bench(), align);
page_cache::init(conf.page_cache_size, align);
{
let mut group = c.benchmark_group("ingest-small-values");

View File

@@ -550,6 +550,19 @@ impl Client {
.map_err(Error::ReceiveBody)
}
/// Configs io mode at runtime.
pub async fn put_io_mode(
&self,
mode: &pageserver_api::models::virtual_file::IoMode,
) -> Result<()> {
let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, mode)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
self.get(uri)

View File

@@ -151,13 +151,10 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
pageserver::page_cache::init(100, align);
let mut total_delta_layers = 0usize;
let mut total_image_layers = 0usize;

View File

@@ -59,8 +59,9 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
page_cache::init(100);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
page_cache::init(100, align);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
@@ -190,12 +191,10 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
pageserver::page_cache::init(100, align);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -205,12 +205,9 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(100);
let align = DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
page_cache::init(100, align);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
}

View File

@@ -63,6 +63,10 @@ pub(crate) struct Args {
#[clap(long)]
set_io_alignment: Option<usize>,
/// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
#[clap(long)]
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -133,6 +137,10 @@ async fn main_impl(
mgmt_api_client.put_io_alignment(align).await?;
}
if let Some(mode) = &args.set_io_mode {
mgmt_api_client.put_io_mode(mode).await?;
}
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,

View File

@@ -125,7 +125,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.virtual_file_io_mode, "starting with virtual_file Direct IO settings");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
// The tenants directory contains all the pageserver local disk state.
@@ -173,7 +173,7 @@ fn main() -> anyhow::Result<()> {
conf.virtual_file_io_engine,
conf.io_buffer_alignment,
);
page_cache::init(conf.page_cache_size);
page_cache::init(conf.page_cache_size, conf.io_buffer_alignment);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -174,7 +174,7 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,
/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,
pub virtual_file_io_mode: virtual_file::IoMode,
pub io_buffer_alignment: usize,
}
@@ -325,7 +325,7 @@ impl PageServerConf {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_direct_io,
virtual_file_io_mode,
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
@@ -368,7 +368,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
virtual_file_direct_io,
io_buffer_alignment,
// ------------------------------------------------------------
@@ -408,6 +407,7 @@ impl PageServerConf {
l0_flush: l0_flush
.map(crate::l0_flush::L0FlushConfig::from)
.unwrap_or_default(),
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
};
// ------------------------------------------------------------

View File

@@ -17,6 +17,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
@@ -2381,6 +2382,16 @@ async fn put_io_alignment_handler(
json_response(StatusCode::OK, ())
}
async fn put_io_mode_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let mode: IoMode = json_request(&mut r).await?;
crate::virtual_file::set_io_mode(mode);
json_response(StatusCode::OK, ())
}
/// Polled by control plane.
///
/// See [`crate::utilization`].
@@ -3071,6 +3082,7 @@ pub fn make_router(
.put("/v1/io_alignment", |r| {
api_handler(r, put_io_alignment_handler)
})
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),

View File

@@ -82,6 +82,7 @@ use once_cell::sync::OnceCell;
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
virtual_file::{self, dio::IoBufferMut},
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -90,8 +91,8 @@ const TEST_PAGE_CACHE_SIZE: usize = 50;
///
/// Initialize the page cache. This must be called once at page server startup.
///
pub fn init(size: usize) {
if PAGE_CACHE.set(PageCache::new(size)).is_err() {
pub fn init(size: usize, align: usize) {
if PAGE_CACHE.set(PageCache::new(size, align)).is_err() {
panic!("page cache already initialized");
}
}
@@ -106,7 +107,12 @@ pub fn get() -> &'static PageCache {
// page cache is usable in unit tests.
//
if cfg!(test) {
PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
PAGE_CACHE.get_or_init(|| {
PageCache::new(
TEST_PAGE_CACHE_SIZE,
virtual_file::get_io_buffer_alignment(),
)
})
} else {
PAGE_CACHE.get().expect("page cache not initialized")
}
@@ -637,13 +643,11 @@ impl PageCache {
/// Initialize a new page cache
///
/// This should be called only once at page server startup.
fn new(num_pages: usize) -> Self {
fn new(num_pages: usize, align: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let page_buffer =
IoBufferMut::with_capacity_aligned_zeroed(num_pages * PAGE_SZ, align).leak();
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);

View File

@@ -84,7 +84,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let path = self.buffered_writer.as_inner().as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
@@ -356,7 +356,7 @@ mod tests {
}
let file_contents =
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let buffer_contents = file.buffered_writer.inspect_buffer();
@@ -392,7 +392,7 @@ mod tests {
.buffered_writer
.as_inner()
.as_inner()
.path
.path()
.metadata()
.unwrap();
assert_eq!(

View File

@@ -43,12 +43,12 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -572,7 +572,7 @@ impl DeltaLayerWriterInner {
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path,
file.path(),
metadata.len()
);
@@ -790,7 +790,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
@@ -991,7 +991,8 @@ impl DeltaLayerInner {
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
@@ -1010,7 +1011,7 @@ impl DeltaLayerInner {
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);
@@ -1018,7 +1019,7 @@ impl DeltaLayerInner {
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
continue;
}
@@ -1036,7 +1037,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1054,7 +1055,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1186,14 +1187,14 @@ impl DeltaLayerInner {
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
let align = virtual_file::get_io_buffer_alignment();
let max_read_size = self
.max_vectored_read_bytes
.map(|x| x.0.get())
.unwrap_or(8192);
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buffer = Some(IoBufferMut::with_capacity_aligned(max_read_size, align));
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
@@ -1552,12 +1553,12 @@ impl<'a> DeltaLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
@@ -1932,7 +1933,9 @@ pub(crate) mod test {
&vectored_reads,
constants::MAX_VECTORED_READ_BYTES,
);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
for read in vectored_reads {
let blobs_buf = vectored_blob_reader

View File

@@ -40,11 +40,12 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
@@ -388,7 +389,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
let file_id = page_cache::next_file_id();
@@ -542,14 +543,15 @@ impl ImageLayerInner {
.await?;
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let align = virtual_file::get_io_buffer_alignment();
let mut key_count = 0;
for read in plan.into_iter() {
let buf_size = read.size();
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
@@ -597,13 +599,13 @@ impl ImageLayerInner {
);
}
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;
@@ -614,7 +616,7 @@ impl ImageLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -635,7 +637,7 @@ impl ImageLayerInner {
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);
@@ -1039,12 +1041,12 @@ impl<'a> ImageLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
next_batch.push_back((

View File

@@ -18,7 +18,7 @@
use std::collections::BTreeMap;
use std::ops::Deref;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
@@ -27,6 +27,7 @@ use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::{self, VirtualFile};
/// Metadata bundled with the start and end offset of a blob.
@@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob {
/// Return type of [`VectoredBlobReader::read_blobs`]
pub struct VectoredBlobsBuf {
/// Buffer for all blobs in this read
pub buf: BytesMut,
pub buf: IoBufferMut,
/// Offsets into the buffer and metadata for all blobs in this read
pub blobs: Vec<VectoredBlob>,
}
@@ -460,7 +461,7 @@ impl<'a> VectoredBlobReader<'a> {
pub async fn read_blobs(
&self,
read: &VectoredRead,
buf: BytesMut,
buf: IoBufferMut,
ctx: &RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
@@ -945,7 +946,8 @@ mod tests {
// Multiply by two (compressed data might need more space), and add a few bytes for the header
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);
let align = virtual_file::get_io_buffer_alignment();
let mut buf = IoBufferMut::with_capacity_aligned(reserved_bytes, align);
let align = virtual_file::get_io_buffer_alignment();
let vectored_blob_reader = VectoredBlobReader::new(&file);

View File

@@ -23,10 +23,12 @@ use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
@@ -38,10 +40,11 @@ pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use api::DirectIoMode;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
pub(crate) mod dio;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
@@ -53,6 +56,7 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod io_buf_aligned;
pub(crate) mod io_buf_ext;
pub(crate) mod slice;
pub(crate) mod write;
@@ -61,6 +65,176 @@ pub(crate) mod owned_buffers_io {
}
}
#[derive(Debug)]
pub enum VirtualFile {
Buffered(VirtualFileInner),
Direct(VirtualFileInner),
}
impl VirtualFile {
fn inner(&self) -> &VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn inner_mut(&mut self) -> &mut VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn into_inner(self) -> VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open(path, ctx).await?;
Ok(Self::Buffered(file))
}
/// Open a file in read-only mode. Like File::open.
///
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
pub async fn open_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::create(path, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn create_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
VirtualFile::open_with_options_v2(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &mut OpenOptions, // Uses `&mut` here to add `O_DIRECT`.
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = match get_io_mode() {
IoMode::Buffered => {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Self::Buffered(file)
}
#[cfg(target_os = "linux")]
IoMode::Direct => {
let file = VirtualFileInner::open_with_options(
path,
open_options.custom_flags(nix::libc::O_DIRECT),
ctx,
)
.await?;
Self::Direct(file)
}
};
Ok(file)
}
pub fn path(&self) -> &Utf8Path {
self.inner().path.as_path()
}
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.inner().sync_all().await
}
pub async fn sync_data(&self) -> Result<(), Error> {
self.inner().sync_data().await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner().metadata().await
}
pub fn remove(self) {
self.into_inner().remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner_mut().seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
where
Buf: IoBufMut + Send,
{
self.inner().read_exact_at(slice, offset, ctx).await
}
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
self.inner().read_exact_at_page(page, offset, ctx).await
}
pub async fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
self.inner().write_all_at(buf, offset, ctx).await
}
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner_mut().write_all(buf, ctx).await
}
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -77,7 +251,7 @@ pub(crate) mod owned_buffers_io {
/// 'tag' field is used to detect whether the handle still is valid or not.
///
#[derive(Debug)]
pub struct VirtualFile {
pub struct VirtualFileInner {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
/// belongs to a different VirtualFile.
@@ -350,12 +524,12 @@ macro_rules! with_file {
}};
}
impl VirtualFile {
impl VirtualFileInner {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
@@ -364,7 +538,7 @@ impl VirtualFile {
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
@@ -382,7 +556,7 @@ impl VirtualFile {
path: P,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -413,7 +587,7 @@ impl VirtualFile {
open_options.open(path_ref.as_std_path()).await?
});
// Strip all options other than read and write.
// Strip all options other than read and write (O_DIRECT).
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
@@ -423,7 +597,7 @@ impl VirtualFile {
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path_ref.to_path_buf(),
@@ -1034,6 +1208,21 @@ impl tokio_epoll_uring::IoFd for FileGuard {
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
self.inner().read_blk(blknum, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner_mut().read_to_end(buf, ctx).await
}
}
#[cfg(test)]
impl VirtualFileInner {
pub(crate) async fn read_blk(
&self,
blknum: u32,
@@ -1067,7 +1256,7 @@ impl VirtualFile {
}
}
impl Drop for VirtualFile {
impl Drop for VirtualFileInner {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();
@@ -1216,6 +1405,15 @@ pub(crate) fn get_io_buffer_alignment() -> usize {
}
}
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn get_io_mode() -> IoMode {
IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
}
#[cfg(test)]
mod tests {
use crate::context::DownloadBehavior;
@@ -1524,7 +1722,7 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
@@ -1576,7 +1774,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1585,7 +1783,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1608,7 +1806,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

View File

@@ -0,0 +1,410 @@
#![allow(unused)]
use core::slice;
use std::{
alloc::{self, Layout},
cmp,
mem::{ManuallyDrop, MaybeUninit},
ops::{Deref, DerefMut},
ptr::{addr_of_mut, NonNull},
};
use bytes::buf::UninitSlice;
struct IoBufferPtr(*mut u8);
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
unsafe impl Send for IoBufferPtr {}
/// An aligned buffer type used for I/O.
pub struct IoBufferMut {
ptr: IoBufferPtr,
capacity: usize,
len: usize,
align: usize,
}
impl IoBufferMut {
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
///
/// The buffer will be able to hold at most `capacity` elements and will never resize.
///
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
/// * `align` must not be zero,
///
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity_aligned(capacity: usize, align: usize) -> Self {
let layout = Layout::from_size_align(capacity, align).expect("Invalid layout");
// SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout.
let ptr = unsafe {
let ptr = alloc::alloc(layout);
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
IoBufferPtr(ptr)
};
IoBufferMut {
ptr,
capacity,
len: 0,
align,
}
}
/// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
pub fn with_capacity_aligned_zeroed(capacity: usize, align: usize) -> Self {
use bytes::BufMut;
let mut buf = Self::with_capacity_aligned(capacity, align);
buf.put_bytes(0, capacity);
buf.len = capacity;
buf
}
/// Returns the total number of bytes the buffer can hold.
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
/// Returns the alignment of the buffer.
#[inline]
pub fn align(&self) -> usize {
self.align
}
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
#[inline]
pub fn len(&self) -> usize {
self.len
}
/// Force the length of the buffer to `new_len`.
#[inline]
unsafe fn set_len(&mut self, new_len: usize) {
debug_assert!(new_len <= self.capacity());
self.len = new_len;
}
#[inline]
fn as_ptr(&self) -> *const u8 {
self.ptr.0
}
#[inline]
fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr.0
}
/// Extracts a slice containing the entire buffer.
///
/// Equivalent to `&s[..]`.
#[inline]
fn as_slice(&self) -> &[u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts(self.as_ptr(), self.len) }
}
/// Extracts a mutable slice of the entire buffer.
///
/// Equivalent to `&mut s[..]`.
fn as_mut_slice(&mut self) -> &mut [u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
}
/// Drops the all the contents of the buffer, setting its length to `0`.
#[inline]
pub fn clear(&mut self) {
self.len = 0;
}
/// Reserves capacity for at least `additional` more bytes to be inserted
/// in the given `IoBufferMut`. The collection may reserve more space to
/// speculatively avoid frequent reallocations. After calling `reserve`,
/// capacity will be greater than or equal to `self.len() + additional`.
/// Does nothing if capacity is already sufficient.
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
pub fn reserve(&mut self, additional: usize) {
if additional > self.capacity() - self.len() {
self.reserve_inner(additional);
}
}
fn reserve_inner(&mut self, additional: usize) {
let Some(required_cap) = self.len().checked_add(additional) else {
capacity_overflow()
};
let old_capacity = self.capacity();
let align = self.align();
// This guarantees exponential growth. The doubling cannot overflow
// because `cap <= isize::MAX` and the type of `cap` is `usize`.
let cap = cmp::max(old_capacity * 2, required_cap);
if !is_valid_alloc(cap) {
capacity_overflow()
}
let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout");
let old_ptr = self.as_mut_ptr();
// SAFETY: old allocation was allocated with std::alloc::alloc with the same layout,
// and we panics on null pointer.
let (ptr, cap) = unsafe {
let old_layout = Layout::from_size_align_unchecked(old_capacity, align);
let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size());
if ptr.is_null() {
alloc::handle_alloc_error(new_layout);
}
(IoBufferPtr(ptr), cap)
};
self.ptr = ptr;
self.capacity = cap;
}
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
pub fn leak<'a>(self) -> &'a mut [u8] {
let mut buf = ManuallyDrop::new(self);
// SAFETY: leaking the buffer as intended.
unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) }
}
}
fn capacity_overflow() -> ! {
panic!("capacity overflow")
}
// We need to guarantee the following:
// * We don't ever allocate `> isize::MAX` byte-size objects.
// * We don't overflow `usize::MAX` and actually allocate too little.
//
// On 64-bit we just need to check for overflow since trying to allocate
// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add
// an extra guard for this in case we're running on a platform which can use
// all 4GB in user-space, e.g., PAE or x32.
#[inline]
fn is_valid_alloc(alloc_size: usize) -> bool {
!(usize::BITS < 64 && alloc_size > isize::MAX as usize)
}
impl Drop for IoBufferMut {
fn drop(&mut self) {
// SAFETY: memory was allocated with std::alloc::alloc with the same layout.
unsafe {
alloc::dealloc(
self.as_mut_ptr(),
Layout::from_size_align_unchecked(self.capacity, self.align),
)
}
}
}
impl Deref for IoBufferMut {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl DerefMut for IoBufferMut {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
unsafe impl bytes::BufMut for IoBufferMut {
#[inline]
fn remaining_mut(&self) -> usize {
// Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
// Thus, it can have at most `self.capacity` bytes.
self.capacity() - self.len()
}
// SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
let len: usize = self.len();
let remaining = self.remaining_mut();
if remaining < cnt {
panic_advance(cnt, remaining);
}
// Addition will not overflow since the sum is at most the capacity.
self.set_len(len + cnt);
}
#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
let cap = self.capacity();
let len = self.len();
// SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
// valid for `cap - len` bytes. The subtraction will not underflow since
// `len <= cap`.
unsafe { UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len) }
}
}
/// Panic with a nice error message.
#[cold]
fn panic_advance(idx: usize, len: usize) -> ! {
panic!(
"advance out of bounds: the len is {} but advancing by {}",
len, idx
);
}
/// Safety: [`IoBufferMut`] has exclusive ownership of the io buffer,
/// and the location remains stable even if [`Self`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for IoBufferMut {
fn stable_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn bytes_init(&self) -> usize {
self.len()
}
fn bytes_total(&self) -> usize {
self.capacity()
}
}
// SAFETY: See above.
unsafe impl tokio_epoll_uring::IoBufMut for IoBufferMut {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
}
unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.set_len(init_len);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_with_capacity_aligned() {
const ALIGN: usize = 4 * 1024;
let v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
let v = IoBufferMut::with_capacity_aligned(ALIGN / 2, ALIGN);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN / 2);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
fn test_with_capacity_aligned_zeroed() {
const ALIGN: usize = 4 * 1024;
let v = IoBufferMut::with_capacity_aligned_zeroed(ALIGN, ALIGN);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
assert_eq!(&v[..], &[0; ALIGN])
}
#[test]
fn test_reserve() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN, ALIGN);
let capacity = v.capacity();
v.reserve(capacity);
assert_eq!(v.capacity(), capacity);
let data = [b'a'; ALIGN];
v.put(&data[..]);
v.reserve(capacity);
assert!(v.capacity() >= capacity * 2);
assert_eq!(&v[..], &data[..]);
let capacity = v.capacity();
v.clear();
v.reserve(capacity);
assert_eq!(capacity, v.capacity());
}
#[test]
fn test_bytes_put() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
for _ in 0..4 {
v.put(&x[..]);
}
assert_eq!(v.len(), ALIGN * 4);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
#[should_panic]
fn test_bytes_put_panic() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..5 {
v.put_slice(&x[..]);
}
}
#[test]
fn test_io_buf_put_slice() {
use tokio_epoll_uring::BoundedBufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
v.put_slice(&x[..]);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
}

View File

@@ -0,0 +1,9 @@
#![allow(unused)]
use tokio_epoll_uring::IoBufMut;
use crate::virtual_file::dio::IoBufferMut;
pub(crate) trait IoBufAlignedMut: IoBufMut {}
impl IoBufAlignedMut for IoBufferMut {}

View File

@@ -1,5 +1,6 @@
//! See [`FullSlice`].
use crate::virtual_file::dio::IoBufferMut;
use bytes::{Bytes, BytesMut};
use std::ops::{Deref, Range};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
@@ -76,3 +77,4 @@ macro_rules! impl_io_buf_ext {
impl_io_buf_ext!(Bytes);
impl_io_buf_ext!(BytesMut);
impl_io_buf_ext!(Vec<u8>);
impl_io_buf_ext!(IoBufferMut);

View File

@@ -401,6 +401,7 @@ class NeonEnvBuilder:
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
pageserver_virtual_file_io_mode: Optional[str] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -455,6 +456,7 @@ class NeonEnvBuilder:
self.storage_controller_port_override = storage_controller_port_override
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
assert test_name.startswith(
"test_"
@@ -1028,6 +1030,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
# Create the neon_local's `NeonLocalInitConf`
cfg: Dict[str, Any] = {
@@ -1091,7 +1094,10 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
if self.pageserver_io_buffer_alignment is not None:
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
# Create a corresponding NeonPageserver object
self.pageservers.append(
@@ -1330,6 +1336,7 @@ def neon_simple_env(
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
@@ -1356,6 +1363,7 @@ def neon_simple_env(
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
env = builder.init_start()
@@ -1380,6 +1388,7 @@ def neon_env_builder(
pageserver_aux_file_policy: Optional[AuxFileStore],
record_property: Callable[[str, object], None],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1415,6 +1424,7 @@ def neon_env_builder(
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,

View File

@@ -39,6 +39,11 @@ def pageserver_io_buffer_alignment() -> Optional[int]:
return None
@pytest.fixture(scope="function", autouse=True)
def pageserver_virtual_file_io_mode() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
@pytest.fixture(scope="function", autouse=True)
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None