Compare commits

...

36 Commits

Author SHA1 Message Date
Yuchen Liang
afa0fe0e87 Merge branch 'yuchen/direct-io-for-read' into yuchen/direct-io-for-read-test 2024-10-21 09:28:20 -04:00
Yuchen Liang
656ddbce2c Merge branch 'main' into yuchen/direct-io-for-read 2024-10-21 09:27:59 -04:00
Yuchen Liang
36850cb047 Merge branch 'yuchen/direct-io-for-read' into yuchen/direct-io-for-read-test 2024-10-18 14:22:41 -04:00
Yuchen Liang
a5a86bedb2 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 18:22:02 +00:00
Yuchen Liang
9653b64569 Merge branch 'yuchen/direct-io-for-read' into yuchen/direct-io-for-read-test 2024-10-18 14:08:26 -04:00
Yuchen Liang
e92d0fa83f Merge branch 'main' into yuchen/direct-io-for-read 2024-10-18 14:08:06 -04:00
Yuchen Liang
ad44e11a69 follow bytes::Bytes convention for AlignedBuffer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 18:06:16 +00:00
Yuchen Liang
d99a61bd75 review: remove outdated todo
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 17:54:59 +00:00
Yuchen Liang
3b88998f8e review: clarify IoBuf safety comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 17:53:49 +00:00
Yuchen Liang
4b4a3edb80 review: remove allow(unused)
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 17:53:11 +00:00
Yuchen Liang
0d0ba568d2 test: use direct on linux
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-14 09:10:03 -04:00
Yuchen Liang
cb51ddc949 Merge branch 'main' into yuchen/direct-io-for-read 2024-10-14 00:41:57 -04:00
Yuchen Liang
156237d553 add more comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-11 17:30:49 -04:00
Yuchen Liang
136006907b Merge branch 'main' into yuchen/direct-io-for-read 2024-10-09 14:00:17 -04:00
Yuchen Liang
929c8d42d3 use io mode from config
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 17:36:30 +00:00
Yuchen Liang
e377177680 use IoBuffer instead of Bytes for inmemory_layer put_bytes
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 12:39:19 -04:00
Yuchen Liang
84b0902e6a fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 12:32:52 -04:00
Yuchen Liang
8f9679ce95 refactor aligned buffer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 10:30:25 -04:00
Yuchen Liang
62722e8559 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 09:39:24 -04:00
Yuchen Liang
b418c343e1 Merge branch 'main' into yuchen/direct-io-for-read 2024-10-09 09:20:47 -04:00
Yuchen Liang
1c61b68c3b use aligned buffer marker trait
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
84e2242673 use aligned buffer for inmemory layer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
e9d9663fcd use aligned buffer for page cache
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
f28dd95022 use aligned buffer for image and delta layers
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
ee4600034e pageserver: implement aligned io buffer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
12a4e33022 Merge branch 'main' into yuchen/virtual-file-config 2024-10-08 17:04:43 -04:00
Yuchen Liang
6d03e2810d review: use a inner and mode member for VirtualFile
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:21:58 +00:00
Yuchen Liang
4e1309475c review: clone open_options instead of taking mut
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:11:04 +00:00
Yuchen Liang
f1418cad52 Merge branch 'main' into yuchen/virtual-file-config 2024-10-07 15:15:26 -04:00
Yuchen Liang
a04cfd754b get rid of io_buffer_alignment config (always 512)
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-07 12:16:11 -04:00
Yuchen Liang
bc13310e56 Merge branch 'main' into yuchen/virtual-file-config 2024-10-07 11:49:13 -04:00
Yuchen Liang
5c76b2d474 fix put_io_mode to use the correct http endpoint
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 10:58:47 -04:00
Yuchen Liang
97f7b0b86f simplify virtual file wrapper
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 08:31:30 -04:00
Yuchen Liang
3a5b44ea53 add set_io_mode option to getpage_latest_lsn
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 08:16:18 -04:00
Yuchen Liang
95554c7377 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 07:59:15 -04:00
Yuchen Liang
a85bd88866 pageserver: add direct io config to virtual file
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-30 23:54:14 -04:00
23 changed files with 905 additions and 79 deletions

View File

@@ -1013,6 +1013,12 @@ pub mod virtual_file {
}
impl IoMode {
#[cfg(target_os = "linux")]
pub const fn preferred() -> Self {
Self::Direct
}
#[cfg(target_os = "macos")]
pub const fn preferred() -> Self {
Self::Buffered
}

View File

@@ -164,7 +164,11 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(16384, virtual_file::io_engine_for_bench());
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
conf.virtual_file_io_mode,
);
page_cache::init(conf.page_cache_size);
{

View File

@@ -7,6 +7,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::virtual_file::api::IoMode;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
@@ -152,7 +153,11 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -11,6 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::virtual_file::api::IoMode;
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
@@ -59,7 +60,11 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
@@ -190,7 +195,11 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -24,7 +24,7 @@ use pageserver::{
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
virtual_file::{self, api::IoMode},
};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::ControlFileData;
@@ -205,7 +205,11 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -167,7 +167,11 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
virtual_file::init(
conf.max_file_descriptors,
conf.virtual_file_io_engine,
conf.virtual_file_io_mode,
);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -82,6 +82,7 @@ use once_cell::sync::OnceCell;
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
virtual_file::{IoBufferMut, IoPageSlice},
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -144,7 +145,7 @@ struct SlotInner {
key: Option<CacheKey>,
// for `coalesce_readers_permit`
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
buf: &'static mut [u8; PAGE_SZ],
buf: IoPageSlice<'static>,
}
impl Slot {
@@ -234,13 +235,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.slot_guard.buf
self.slot_guard.buf.deref()
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.slot_guard.buf
self.slot_guard.buf.as_ref()
}
}
@@ -266,7 +267,7 @@ enum PageWriteGuardState<'i> {
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -277,7 +278,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
fn deref(&self) -> &Self::Target {
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -643,7 +644,7 @@ impl PageCache {
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
@@ -652,7 +653,8 @@ impl PageCache {
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
.map(|chunk| {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
// SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
Slot {
inner: tokio::sync::RwLock::new(SlotInner {

View File

@@ -5,6 +5,8 @@
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
#[cfg(test)]
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::Deref;
@@ -40,7 +42,7 @@ pub enum BlockLease<'a> {
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
Vec(Vec<u8>),
IoBufferMut(IoBufferMut),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
@@ -67,7 +69,7 @@ impl Deref for BlockLease<'_> {
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
BlockLease::Vec(v) => {
BlockLease::IoBufferMut(v) => {
TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
}
}

View File

@@ -6,10 +6,11 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use num_traits::Num;
@@ -107,15 +108,18 @@ impl EphemeralFile {
self.page_cache_file_id
}
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
pub(crate) async fn load_to_io_buf(
&self,
ctx: &RequestContext,
) -> Result<IoBufferMut, io::Error> {
let size = self.len().into_usize();
let vec = Vec::with_capacity(size);
let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
let buf = IoBufferMut::with_capacity(size);
let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
assert_eq!(nread, size);
let vec = slice.into_inner();
assert_eq!(vec.len(), nread);
assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
Ok(vec)
let buf = slice.into_inner();
assert_eq!(buf.len(), nread);
assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
Ok(buf)
}
/// Returns the offset at which the first byte of the input was written, for use
@@ -158,7 +162,7 @@ impl EphemeralFile {
}
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
&'b self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
@@ -345,7 +349,7 @@ mod tests {
assert!(file.len() as usize == write_nbytes);
for i in 0..write_nbytes {
assert_eq!(value_offsets[i], i.into_u64());
let buf = Vec::with_capacity(1);
let buf = IoBufferMut::with_capacity(1);
let (buf_slice, nread) = file
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
.await
@@ -385,7 +389,7 @@ mod tests {
// assert the state is as this test expects it to be
assert_eq!(
&file.load_to_vec(&ctx).await.unwrap(),
&file.load_to_io_buf(&ctx).await.unwrap(),
&content[0..cap + cap / 2]
);
let md = file
@@ -440,7 +444,7 @@ mod tests {
let (buf, nread) = file
.read_exact_at_eof_ok(
start.into_u64(),
Vec::with_capacity(len).slice_full(),
IoBufferMut::with_capacity(len).slice_full(),
ctx,
)
.await

View File

@@ -44,11 +44,11 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -1002,7 +1002,7 @@ impl DeltaLayerInner {
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let mut buf = Some(IoBufferMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
@@ -1029,7 +1029,7 @@ impl DeltaLayerInner {
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
buf = Some(IoBufferMut::with_capacity(buf_size));
continue;
}
@@ -1203,7 +1203,7 @@ impl DeltaLayerInner {
.map(|x| x.0.get())
.unwrap_or(8192);
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
@@ -1561,12 +1561,11 @@ impl<'a> DeltaLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
@@ -1941,7 +1940,7 @@ pub(crate) mod test {
&vectored_reads,
constants::MAX_VECTORED_READ_BYTES,
);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let mut buf = Some(IoBufferMut::with_capacity(buf_size));
for read in vectored_reads {
let blobs_buf = vectored_blob_reader

View File

@@ -41,10 +41,11 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
@@ -547,10 +548,10 @@ impl ImageLayerInner {
for read in plan.into_iter() {
let buf_size = read.size();
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
@@ -609,13 +610,12 @@ impl ImageLayerInner {
}
}
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;
@@ -1050,12 +1050,11 @@ impl<'a> ImageLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
next_batch.push_back((

View File

@@ -14,7 +14,6 @@ use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
@@ -809,9 +808,8 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
let file_contents = Bytes::from(file_contents);
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
@@ -825,7 +823,7 @@ impl InMemoryLayer {
len,
will_init,
} = entry;
let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize);
let buf = file_contents.slice(pos as usize..(pos + len) as usize);
let (_buf, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),

View File

@@ -9,6 +9,7 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use crate::{
assert_u64_eq_usize::{U64IsUsize, UsizeIsU64},
context::RequestContext,
virtual_file::{owned_buffers_io::io_buf_aligned::IoBufAlignedMut, IoBufferMut},
};
/// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`].
@@ -24,7 +25,7 @@ pub trait File: Send {
/// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`.
///
/// No guarantees are made about the remaining bytes in `dst` in case of a short read.
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
&'b self,
start: u64,
dst: Slice<B>,
@@ -227,7 +228,7 @@ where
// Execute physical reads and fill the logical read buffers
// TODO: pipelined reads; prefetch;
let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE);
let get_io_buffer = |nchunks| IoBufferMut::with_capacity(nchunks * DIO_CHUNK_SIZE);
for PhysicalRead {
start_chunk_no,
nchunks,
@@ -459,7 +460,7 @@ mod tests {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let file = InMemoryFile::new_random(10);
let test_read = |pos, len| {
let buf = vec![0; len];
let buf = IoBufferMut::with_capacity_zeroed(len);
let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx);
use futures::FutureExt;
let (slice, nread) = fut
@@ -470,9 +471,9 @@ mod tests {
buf.truncate(nread);
buf
};
assert_eq!(test_read(0, 1), &file.content[0..1]);
assert_eq!(test_read(1, 2), &file.content[1..3]);
assert_eq!(test_read(9, 2), &file.content[9..]);
assert_eq!(&test_read(0, 1), &file.content[0..1]);
assert_eq!(&test_read(1, 2), &file.content[1..3]);
assert_eq!(&test_read(9, 2), &file.content[9..]);
assert!(test_read(10, 2).is_empty());
assert!(test_read(11, 2).is_empty());
}
@@ -609,7 +610,7 @@ mod tests {
}
impl<'x> File for RecorderFile<'x> {
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
&'b self,
start: u64,
dst: Slice<B>,
@@ -782,7 +783,7 @@ mod tests {
2048, 1024 => Err("foo".to_owned()),
};
let buf = Vec::with_capacity(512);
let buf = IoBufferMut::with_capacity(512);
let (buf, nread) = mock_file
.read_exact_at_eof_ok(0, buf.slice_full(), &ctx)
.await
@@ -790,7 +791,7 @@ mod tests {
assert_eq!(nread, 512);
assert_eq!(&buf.into_inner()[..nread], &[0; 512]);
let buf = Vec::with_capacity(512);
let buf = IoBufferMut::with_capacity(512);
let (buf, nread) = mock_file
.read_exact_at_eof_ok(512, buf.slice_full(), &ctx)
.await
@@ -798,7 +799,7 @@ mod tests {
assert_eq!(nread, 512);
assert_eq!(&buf.into_inner()[..nread], &[1; 512]);
let buf = Vec::with_capacity(512);
let buf = IoBufferMut::with_capacity(512);
let (buf, nread) = mock_file
.read_exact_at_eof_ok(1024, buf.slice_full(), &ctx)
.await
@@ -806,7 +807,7 @@ mod tests {
assert_eq!(nread, 10);
assert_eq!(&buf.into_inner()[..nread], &[2; 10]);
let buf = Vec::with_capacity(1024);
let buf = IoBufferMut::with_capacity(1024);
let err = mock_file
.read_exact_at_eof_ok(2048, buf.slice_full(), &ctx)
.await

View File

@@ -18,7 +18,7 @@
use std::collections::BTreeMap;
use std::ops::Deref;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
@@ -27,6 +27,7 @@ use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, VirtualFile};
/// Metadata bundled with the start and end offset of a blob.
@@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob {
/// Return type of [`VectoredBlobReader::read_blobs`]
pub struct VectoredBlobsBuf {
/// Buffer for all blobs in this read
pub buf: BytesMut,
pub buf: IoBufferMut,
/// Offsets into the buffer and metadata for all blobs in this read
pub blobs: Vec<VectoredBlob>,
}
@@ -441,7 +442,7 @@ impl<'a> VectoredBlobReader<'a> {
pub async fn read_blobs(
&self,
read: &VectoredRead,
buf: BytesMut,
buf: IoBufferMut,
ctx: &RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
@@ -916,7 +917,7 @@ mod tests {
// Multiply by two (compressed data might need more space), and add a few bytes for the header
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);
let mut buf = IoBufferMut::with_capacity(reserved_bytes);
let vectored_blob_reader = VectoredBlobReader::new(&file);
let meta = BlobMeta {

View File

@@ -18,6 +18,9 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ};
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
use owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
@@ -55,6 +58,8 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod aligned_buffer;
pub(crate) mod io_buf_aligned;
pub(crate) mod io_buf_ext;
pub(crate) mod slice;
pub(crate) mod write;
@@ -196,7 +201,7 @@ impl VirtualFile {
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
where
Buf: IoBufMut + Send,
Buf: IoBufAlignedMut + Send,
{
self.inner.read_exact_at(slice, offset, ctx).await
}
@@ -771,7 +776,7 @@ impl VirtualFileInner {
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
where
Buf: IoBufMut + Send,
Buf: IoBufAlignedMut + Send,
{
let assert_we_return_original_bounds = if cfg!(debug_assertions) {
Some((slice.stable_ptr() as usize, slice.bytes_total()))
@@ -1222,12 +1227,14 @@ impl VirtualFileInner {
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let slice = Vec::with_capacity(PAGE_SZ).slice_full();
let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
assert_eq!(slice.bytes_total(), PAGE_SZ);
let slice = self
.read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
.await?;
Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
slice.into_inner(),
))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
@@ -1325,10 +1332,11 @@ impl OpenFiles {
/// server startup.
///
#[cfg(not(test))]
pub fn init(num_slots: usize, engine: IoEngineKind) {
pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
set_io_mode(mode);
io_engine::init(engine);
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
@@ -1357,6 +1365,11 @@ pub(crate) const fn get_io_buffer_alignment() -> usize {
DEFAULT_IO_BUFFER_ALIGNMENT
}
pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
pub(crate) type IoPageSlice<'a> =
AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
@@ -1395,10 +1408,10 @@ mod tests {
impl MaybeVirtualFile {
async fn read_exact_at(
&self,
mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
offset: u64,
ctx: &RequestContext,
) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
MaybeVirtualFile::File(file) => {
@@ -1466,12 +1479,13 @@ mod tests {
len: usize,
ctx: &RequestContext,
) -> Result<String, Error> {
let slice = Vec::with_capacity(len).slice_full();
let slice = IoBufferMut::with_capacity(len).slice_full();
assert_eq!(slice.bytes_total(), len);
let slice = self.read_exact_at(slice, pos, ctx).await?;
let vec = slice.into_inner();
assert_eq!(vec.len(), len);
Ok(String::from_utf8(vec).unwrap())
let buf = slice.into_inner();
assert_eq!(buf.len(), len);
Ok(String::from_utf8(buf.to_vec()).unwrap())
}
}
@@ -1695,7 +1709,7 @@ mod tests {
let files = files.clone();
let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
let hdl = rt.spawn(async move {
let mut buf = vec![0u8; SIZE];
let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
@@ -1704,7 +1718,7 @@ mod tests {
.await
.unwrap()
.into_inner();
assert!(buf == SAMPLE);
assert!(buf[..] == SAMPLE);
}
});
hdls.push(hdl);

View File

@@ -0,0 +1,9 @@
pub mod alignment;
pub mod buffer;
pub mod buffer_mut;
pub mod raw;
pub mod slice;
pub use alignment::*;
pub use buffer_mut::AlignedBufferMut;
pub use slice::AlignedSlice;

View File

@@ -0,0 +1,26 @@
pub trait Alignment: std::marker::Unpin + 'static {
/// Returns the required alignments.
fn align(&self) -> usize;
}
/// Alignment at compile time.
#[derive(Debug)]
pub struct ConstAlign<const A: usize>;
impl<const A: usize> Alignment for ConstAlign<A> {
fn align(&self) -> usize {
A
}
}
/// Alignment at run time.
#[derive(Debug)]
pub struct RuntimeAlign {
align: usize,
}
impl Alignment for RuntimeAlign {
fn align(&self) -> usize {
self.align
}
}

View File

@@ -0,0 +1,124 @@
use std::{
ops::{Deref, Range, RangeBounds},
sync::Arc,
};
use super::{alignment::Alignment, raw::RawAlignedBuffer};
/// An shared, immutable aligned buffer type.
pub struct AlignedBuffer<A: Alignment> {
/// Shared raw buffer.
raw: Arc<RawAlignedBuffer<A>>,
/// Range that specifies the current slice.
range: Range<usize>,
}
impl<A: Alignment> AlignedBuffer<A> {
/// Creates an immutable `IoBuffer` from the raw buffer
pub(super) fn from_raw(raw: RawAlignedBuffer<A>, range: Range<usize>) -> Self {
AlignedBuffer {
raw: Arc::new(raw),
range,
}
}
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
#[inline]
pub fn len(&self) -> usize {
self.range.len()
}
/// Returns the alignment of the buffer.
#[inline]
pub fn align(&self) -> usize {
self.raw.align()
}
#[inline]
fn as_ptr(&self) -> *const u8 {
// SAFETY: `self.range.start` is guaranteed to be within [0, self.len()).
unsafe { self.raw.as_ptr().add(self.range.start) }
}
/// Extracts a slice containing the entire buffer.
///
/// Equivalent to `&s[..]`.
#[inline]
fn as_slice(&self) -> &[u8] {
&self.raw.as_slice()[self.range.start..self.range.end]
}
/// Returns a slice of self for the index range `[begin..end)`.
pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
use core::ops::Bound;
let len = self.len();
let begin = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.checked_add(1).expect("out of range"),
Bound::Excluded(&n) => n,
Bound::Unbounded => len,
};
assert!(
begin <= end,
"range start must not be greater than end: {:?} <= {:?}",
begin,
end,
);
assert!(
end <= len,
"range end out of bounds: {:?} <= {:?}",
end,
len,
);
let begin = self.range.start + begin;
let end = self.range.start + end;
AlignedBuffer {
raw: Arc::clone(&self.raw),
range: begin..end,
}
}
}
impl<A: Alignment> Deref for AlignedBuffer<A> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl<A: Alignment> AsRef<[u8]> for AlignedBuffer<A> {
fn as_ref(&self) -> &[u8] {
self.as_slice()
}
}
impl<A: Alignment> PartialEq<[u8]> for AlignedBuffer<A> {
fn eq(&self, other: &[u8]) -> bool {
self.as_slice().eq(other)
}
}
/// SAFETY: the underlying buffer references a stable memory region.
unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBuffer<A> {
fn stable_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn bytes_init(&self) -> usize {
self.len()
}
fn bytes_total(&self) -> usize {
self.len()
}
}

View File

@@ -0,0 +1,347 @@
use std::ops::{Deref, DerefMut};
use super::{
alignment::{Alignment, ConstAlign},
buffer::AlignedBuffer,
raw::RawAlignedBuffer,
};
/// A mutable aligned buffer type.
#[derive(Debug)]
pub struct AlignedBufferMut<A: Alignment> {
raw: RawAlignedBuffer<A>,
}
impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
///
/// The buffer will be able to hold at most `capacity` elements and will never resize.
///
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
/// * `align` must not be zero,
///
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity(capacity: usize) -> Self {
AlignedBufferMut {
raw: RawAlignedBuffer::with_capacity(capacity),
}
}
/// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
pub fn with_capacity_zeroed(capacity: usize) -> Self {
use bytes::BufMut;
let mut buf = Self::with_capacity(capacity);
buf.put_bytes(0, capacity);
// SAFETY: `put_bytes` filled the entire buffer.
unsafe { buf.set_len(capacity) };
buf
}
}
impl<A: Alignment> AlignedBufferMut<A> {
/// Returns the total number of bytes the buffer can hold.
#[inline]
pub fn capacity(&self) -> usize {
self.raw.capacity()
}
/// Returns the alignment of the buffer.
#[inline]
pub fn align(&self) -> usize {
self.raw.align()
}
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
#[inline]
pub fn len(&self) -> usize {
self.raw.len()
}
/// Force the length of the buffer to `new_len`.
#[inline]
unsafe fn set_len(&mut self, new_len: usize) {
self.raw.set_len(new_len)
}
#[inline]
fn as_ptr(&self) -> *const u8 {
self.raw.as_ptr()
}
#[inline]
fn as_mut_ptr(&mut self) -> *mut u8 {
self.raw.as_mut_ptr()
}
/// Extracts a slice containing the entire buffer.
///
/// Equivalent to `&s[..]`.
#[inline]
fn as_slice(&self) -> &[u8] {
self.raw.as_slice()
}
/// Extracts a mutable slice of the entire buffer.
///
/// Equivalent to `&mut s[..]`.
fn as_mut_slice(&mut self) -> &mut [u8] {
self.raw.as_mut_slice()
}
/// Drops the all the contents of the buffer, setting its length to `0`.
#[inline]
pub fn clear(&mut self) {
self.raw.clear()
}
/// Reserves capacity for at least `additional` more bytes to be inserted
/// in the given `IoBufferMut`. The collection may reserve more space to
/// speculatively avoid frequent reallocations. After calling `reserve`,
/// capacity will be greater than or equal to `self.len() + additional`.
/// Does nothing if capacity is already sufficient.
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
pub fn reserve(&mut self, additional: usize) {
self.raw.reserve(additional);
}
/// Shortens the buffer, keeping the first len bytes.
pub fn truncate(&mut self, len: usize) {
self.raw.truncate(len);
}
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
pub fn leak<'a>(self) -> &'a mut [u8] {
self.raw.leak()
}
pub fn freeze(self) -> AlignedBuffer<A> {
let len = self.len();
AlignedBuffer::from_raw(self.raw, 0..len)
}
}
impl<A: Alignment> Deref for AlignedBufferMut<A> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
fn as_ref(&self) -> &[u8] {
self.as_slice()
}
}
impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
fn as_mut(&mut self) -> &mut [u8] {
self.as_mut_slice()
}
}
impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
fn eq(&self, other: &[u8]) -> bool {
self.as_slice().eq(other)
}
}
/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
#[inline]
fn remaining_mut(&self) -> usize {
// Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
// Thus, it can have at most `self.capacity` bytes.
self.capacity() - self.len()
}
// SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
let len = self.len();
let remaining = self.remaining_mut();
if remaining < cnt {
panic_advance(cnt, remaining);
}
// Addition will not overflow since the sum is at most the capacity.
self.set_len(len + cnt);
}
#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
let cap = self.capacity();
let len = self.len();
// SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
// valid for `cap - len` bytes. The subtraction will not underflow since
// `len <= cap`.
unsafe {
bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
}
}
}
/// Panic with a nice error message.
#[cold]
fn panic_advance(idx: usize, len: usize) -> ! {
panic!(
"advance out of bounds: the len is {} but advancing by {}",
len, idx
);
}
/// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
/// and the underlying pointer remains stable while io-uring is owning the buffer.
/// The tokio-epoll-uring crate itself will not resize the buffer and will respect
/// [`tokio_epoll_uring::IoBuf::bytes_total`].
unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
fn stable_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn bytes_init(&self) -> usize {
self.len()
}
fn bytes_total(&self) -> usize {
self.capacity()
}
}
// SAFETY: See above.
unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
}
unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.set_len(init_len);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const ALIGN: usize = 4 * 1024;
type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
#[test]
fn test_with_capacity() {
let v = TestIoBufferMut::with_capacity(ALIGN * 4);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
let v = TestIoBufferMut::with_capacity(ALIGN / 2);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN / 2);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
fn test_with_capacity_zeroed() {
let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
assert_eq!(&v[..], &[0; ALIGN])
}
#[test]
fn test_reserve() {
use bytes::BufMut;
let mut v = TestIoBufferMut::with_capacity(ALIGN);
let capacity = v.capacity();
v.reserve(capacity);
assert_eq!(v.capacity(), capacity);
let data = [b'a'; ALIGN];
v.put(&data[..]);
v.reserve(capacity);
assert!(v.capacity() >= capacity * 2);
assert_eq!(&v[..], &data[..]);
let capacity = v.capacity();
v.clear();
v.reserve(capacity);
assert_eq!(capacity, v.capacity());
}
#[test]
fn test_bytes_put() {
use bytes::BufMut;
let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
let x = [b'a'; ALIGN];
for _ in 0..2 {
for _ in 0..4 {
v.put(&x[..]);
}
assert_eq!(v.len(), ALIGN * 4);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
#[should_panic]
fn test_bytes_put_panic() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
let x = [b'a'; ALIGN];
for _ in 0..5 {
v.put_slice(&x[..]);
}
}
#[test]
fn test_io_buf_put_slice() {
use tokio_epoll_uring::BoundedBufMut;
const ALIGN: usize = 4 * 1024;
let mut v = TestIoBufferMut::with_capacity(ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
v.put_slice(&x[..]);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
}

View File

@@ -0,0 +1,216 @@
use core::slice;
use std::{
alloc::{self, Layout},
cmp,
mem::ManuallyDrop,
};
use super::alignment::{Alignment, ConstAlign};
#[derive(Debug)]
struct AlignedBufferPtr(*mut u8);
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
unsafe impl Send for AlignedBufferPtr {}
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
unsafe impl Sync for AlignedBufferPtr {}
/// An aligned buffer type.
#[derive(Debug)]
pub struct RawAlignedBuffer<A: Alignment> {
ptr: AlignedBufferPtr,
capacity: usize,
len: usize,
align: A,
}
impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
///
/// The buffer will be able to hold at most `capacity` elements and will never resize.
///
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
/// * `align` must not be zero,
///
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity(capacity: usize) -> Self {
let align = ConstAlign::<A>;
let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout");
// SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout.
let ptr = unsafe {
let ptr = alloc::alloc(layout);
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
AlignedBufferPtr(ptr)
};
RawAlignedBuffer {
ptr,
capacity,
len: 0,
align,
}
}
}
impl<A: Alignment> RawAlignedBuffer<A> {
/// Returns the total number of bytes the buffer can hold.
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
/// Returns the alignment of the buffer.
#[inline]
pub fn align(&self) -> usize {
self.align.align()
}
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
#[inline]
pub fn len(&self) -> usize {
self.len
}
/// Force the length of the buffer to `new_len`.
#[inline]
pub unsafe fn set_len(&mut self, new_len: usize) {
debug_assert!(new_len <= self.capacity());
self.len = new_len;
}
#[inline]
pub fn as_ptr(&self) -> *const u8 {
self.ptr.0
}
#[inline]
pub fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr.0
}
/// Extracts a slice containing the entire buffer.
///
/// Equivalent to `&s[..]`.
#[inline]
pub fn as_slice(&self) -> &[u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts(self.as_ptr(), self.len) }
}
/// Extracts a mutable slice of the entire buffer.
///
/// Equivalent to `&mut s[..]`.
pub fn as_mut_slice(&mut self) -> &mut [u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
}
/// Drops the all the contents of the buffer, setting its length to `0`.
#[inline]
pub fn clear(&mut self) {
self.len = 0;
}
/// Reserves capacity for at least `additional` more bytes to be inserted
/// in the given `IoBufferMut`. The collection may reserve more space to
/// speculatively avoid frequent reallocations. After calling `reserve`,
/// capacity will be greater than or equal to `self.len() + additional`.
/// Does nothing if capacity is already sufficient.
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
pub fn reserve(&mut self, additional: usize) {
if additional > self.capacity() - self.len() {
self.reserve_inner(additional);
}
}
fn reserve_inner(&mut self, additional: usize) {
let Some(required_cap) = self.len().checked_add(additional) else {
capacity_overflow()
};
let old_capacity = self.capacity();
let align = self.align();
// This guarantees exponential growth. The doubling cannot overflow
// because `cap <= isize::MAX` and the type of `cap` is `usize`.
let cap = cmp::max(old_capacity * 2, required_cap);
if !is_valid_alloc(cap) {
capacity_overflow()
}
let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout");
let old_ptr = self.as_mut_ptr();
// SAFETY: old allocation was allocated with std::alloc::alloc with the same layout,
// and we panics on null pointer.
let (ptr, cap) = unsafe {
let old_layout = Layout::from_size_align_unchecked(old_capacity, align);
let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size());
if ptr.is_null() {
alloc::handle_alloc_error(new_layout);
}
(AlignedBufferPtr(ptr), cap)
};
self.ptr = ptr;
self.capacity = cap;
}
/// Shortens the buffer, keeping the first len bytes.
pub fn truncate(&mut self, len: usize) {
if len > self.len {
return;
}
self.len = len;
}
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
pub fn leak<'a>(self) -> &'a mut [u8] {
let mut buf = ManuallyDrop::new(self);
// SAFETY: leaking the buffer as intended.
unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) }
}
}
fn capacity_overflow() -> ! {
panic!("capacity overflow")
}
// We need to guarantee the following:
// * We don't ever allocate `> isize::MAX` byte-size objects.
// * We don't overflow `usize::MAX` and actually allocate too little.
//
// On 64-bit we just need to check for overflow since trying to allocate
// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add
// an extra guard for this in case we're running on a platform which can use
// all 4GB in user-space, e.g., PAE or x32.
#[inline]
fn is_valid_alloc(alloc_size: usize) -> bool {
!(usize::BITS < 64 && alloc_size > isize::MAX as usize)
}
impl<A: Alignment> Drop for RawAlignedBuffer<A> {
fn drop(&mut self) {
// SAFETY: memory was allocated with std::alloc::alloc with the same layout.
unsafe {
alloc::dealloc(
self.as_mut_ptr(),
Layout::from_size_align_unchecked(self.capacity, self.align.align()),
)
}
}
}

View File

@@ -0,0 +1,40 @@
use std::ops::{Deref, DerefMut};
use super::alignment::{Alignment, ConstAlign};
/// Newtype for an aligned slice.
pub struct AlignedSlice<'a, const N: usize, A: Alignment> {
/// underlying byte slice
buf: &'a mut [u8; N],
/// alignment marker
_align: A,
}
impl<'a, const N: usize, const A: usize> AlignedSlice<'a, N, ConstAlign<A>> {
/// Create a new aligned slice from a mutable byte slice. The input must already satisify the alignment.
pub unsafe fn new_unchecked(buf: &'a mut [u8; N]) -> Self {
let _align = ConstAlign::<A>;
assert_eq!(buf.as_ptr().align_offset(_align.align()), 0);
AlignedSlice { buf, _align }
}
}
impl<'a, const N: usize, A: Alignment> Deref for AlignedSlice<'a, N, A> {
type Target = [u8; N];
fn deref(&self) -> &Self::Target {
self.buf
}
}
impl<'a, const N: usize, A: Alignment> DerefMut for AlignedSlice<'a, N, A> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buf
}
}
impl<'a, const N: usize, A: Alignment> AsRef<[u8; N]> for AlignedSlice<'a, N, A> {
fn as_ref(&self) -> &[u8; N] {
self.buf
}
}

View File

@@ -0,0 +1,9 @@
use tokio_epoll_uring::IoBufMut;
use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf};
pub trait IoBufAlignedMut: IoBufMut {}
impl IoBufAlignedMut for IoBufferMut {}
impl IoBufAlignedMut for PageWriteGuardBuf {}

View File

@@ -1,5 +1,6 @@
//! See [`FullSlice`].
use crate::virtual_file::{IoBuffer, IoBufferMut};
use bytes::{Bytes, BytesMut};
use std::ops::{Deref, Range};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
@@ -76,3 +77,5 @@ macro_rules! impl_io_buf_ext {
impl_io_buf_ext!(Bytes);
impl_io_buf_ext!(BytesMut);
impl_io_buf_ext!(Vec<u8>);
impl_io_buf_ext!(IoBufferMut);
impl_io_buf_ext!(IoBuffer);