mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
36 Commits
release-75
...
yuchen/dir
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afa0fe0e87 | ||
|
|
656ddbce2c | ||
|
|
36850cb047 | ||
|
|
a5a86bedb2 | ||
|
|
9653b64569 | ||
|
|
e92d0fa83f | ||
|
|
ad44e11a69 | ||
|
|
d99a61bd75 | ||
|
|
3b88998f8e | ||
|
|
4b4a3edb80 | ||
|
|
0d0ba568d2 | ||
|
|
cb51ddc949 | ||
|
|
156237d553 | ||
|
|
136006907b | ||
|
|
929c8d42d3 | ||
|
|
e377177680 | ||
|
|
84b0902e6a | ||
|
|
8f9679ce95 | ||
|
|
62722e8559 | ||
|
|
b418c343e1 | ||
|
|
1c61b68c3b | ||
|
|
84e2242673 | ||
|
|
e9d9663fcd | ||
|
|
f28dd95022 | ||
|
|
ee4600034e | ||
|
|
12a4e33022 | ||
|
|
6d03e2810d | ||
|
|
4e1309475c | ||
|
|
f1418cad52 | ||
|
|
a04cfd754b | ||
|
|
bc13310e56 | ||
|
|
5c76b2d474 | ||
|
|
97f7b0b86f | ||
|
|
3a5b44ea53 | ||
|
|
95554c7377 | ||
|
|
a85bd88866 |
@@ -1013,6 +1013,12 @@ pub mod virtual_file {
|
||||
}
|
||||
|
||||
impl IoMode {
|
||||
#[cfg(target_os = "linux")]
|
||||
pub const fn preferred() -> Self {
|
||||
Self::Direct
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
pub const fn preferred() -> Self {
|
||||
Self::Buffered
|
||||
}
|
||||
|
||||
@@ -164,7 +164,11 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(
|
||||
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
|
||||
));
|
||||
virtual_file::init(16384, virtual_file::io_engine_for_bench());
|
||||
virtual_file::init(
|
||||
16384,
|
||||
virtual_file::io_engine_for_bench(),
|
||||
conf.virtual_file_io_mode,
|
||||
);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
{
|
||||
|
||||
@@ -7,6 +7,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::virtual_file::api::IoMode;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::ops::Range;
|
||||
@@ -152,7 +153,11 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
|
||||
@@ -11,6 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::virtual_file::api::IoMode;
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
repository::{Key, KEY_SIZE},
|
||||
@@ -59,7 +60,11 @@ pub(crate) enum LayerCmd {
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
);
|
||||
page_cache::init(100);
|
||||
let file = VirtualFile::open(path, ctx).await?;
|
||||
let file_id = page_cache::next_file_id();
|
||||
@@ -190,7 +195,11 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
@@ -24,7 +24,7 @@ use pageserver::{
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
virtual_file::{self, api::IoMode},
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::ControlFileData;
|
||||
@@ -205,7 +205,11 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
|
||||
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
|
||||
@@ -167,7 +167,11 @@ fn main() -> anyhow::Result<()> {
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
|
||||
virtual_file::init(
|
||||
conf.max_file_descriptors,
|
||||
conf.virtual_file_io_engine,
|
||||
conf.virtual_file_io_mode,
|
||||
);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
@@ -82,6 +82,7 @@ use once_cell::sync::OnceCell;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
|
||||
virtual_file::{IoBufferMut, IoPageSlice},
|
||||
};
|
||||
|
||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||
@@ -144,7 +145,7 @@ struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
// for `coalesce_readers_permit`
|
||||
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
|
||||
buf: &'static mut [u8; PAGE_SZ],
|
||||
buf: IoPageSlice<'static>,
|
||||
}
|
||||
|
||||
impl Slot {
|
||||
@@ -234,13 +235,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.slot_guard.buf
|
||||
self.slot_guard.buf.deref()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.slot_guard.buf
|
||||
self.slot_guard.buf.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,7 +267,7 @@ enum PageWriteGuardState<'i> {
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -277,7 +278,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match &self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -643,7 +644,7 @@ impl PageCache {
|
||||
// We could use Vec::leak here, but that potentially also leaks
|
||||
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
|
||||
// this is avoided.
|
||||
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
||||
let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
@@ -652,7 +653,8 @@ impl PageCache {
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
// SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
|
||||
let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
|
||||
|
||||
Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
|
||||
#[cfg(test)]
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::Deref;
|
||||
@@ -40,7 +42,7 @@ pub enum BlockLease<'a> {
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
#[cfg(test)]
|
||||
Vec(Vec<u8>),
|
||||
IoBufferMut(IoBufferMut),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
@@ -67,7 +69,7 @@ impl Deref for BlockLease<'_> {
|
||||
#[cfg(test)]
|
||||
BlockLease::Arc(v) => v.deref(),
|
||||
#[cfg(test)]
|
||||
BlockLease::Vec(v) => {
|
||||
BlockLease::IoBufferMut(v) => {
|
||||
TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,10 +6,11 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache;
|
||||
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
|
||||
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
|
||||
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use num_traits::Num;
|
||||
@@ -107,15 +108,18 @@ impl EphemeralFile {
|
||||
self.page_cache_file_id
|
||||
}
|
||||
|
||||
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
|
||||
pub(crate) async fn load_to_io_buf(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<IoBufferMut, io::Error> {
|
||||
let size = self.len().into_usize();
|
||||
let vec = Vec::with_capacity(size);
|
||||
let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
|
||||
let buf = IoBufferMut::with_capacity(size);
|
||||
let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
|
||||
assert_eq!(nread, size);
|
||||
let vec = slice.into_inner();
|
||||
assert_eq!(vec.len(), nread);
|
||||
assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
|
||||
Ok(vec)
|
||||
let buf = slice.into_inner();
|
||||
assert_eq!(buf.len(), nread);
|
||||
assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Returns the offset at which the first byte of the input was written, for use
|
||||
@@ -158,7 +162,7 @@ impl EphemeralFile {
|
||||
}
|
||||
|
||||
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
|
||||
&'b self,
|
||||
start: u64,
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
@@ -345,7 +349,7 @@ mod tests {
|
||||
assert!(file.len() as usize == write_nbytes);
|
||||
for i in 0..write_nbytes {
|
||||
assert_eq!(value_offsets[i], i.into_u64());
|
||||
let buf = Vec::with_capacity(1);
|
||||
let buf = IoBufferMut::with_capacity(1);
|
||||
let (buf_slice, nread) = file
|
||||
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
|
||||
.await
|
||||
@@ -385,7 +389,7 @@ mod tests {
|
||||
|
||||
// assert the state is as this test expects it to be
|
||||
assert_eq!(
|
||||
&file.load_to_vec(&ctx).await.unwrap(),
|
||||
&file.load_to_io_buf(&ctx).await.unwrap(),
|
||||
&content[0..cap + cap / 2]
|
||||
);
|
||||
let md = file
|
||||
@@ -440,7 +444,7 @@ mod tests {
|
||||
let (buf, nread) = file
|
||||
.read_exact_at_eof_ok(
|
||||
start.into_u64(),
|
||||
Vec::with_capacity(len).slice_full(),
|
||||
IoBufferMut::with_capacity(len).slice_full(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -44,11 +44,11 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
@@ -1002,7 +1002,7 @@ impl DeltaLayerInner {
|
||||
.0
|
||||
.into();
|
||||
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
|
||||
let mut buf = Some(BytesMut::with_capacity(buf_size));
|
||||
let mut buf = Some(IoBufferMut::with_capacity(buf_size));
|
||||
|
||||
// Note that reads are processed in reverse order (from highest key+lsn).
|
||||
// This is the order that `ReconstructState` requires such that it can
|
||||
@@ -1029,7 +1029,7 @@ impl DeltaLayerInner {
|
||||
|
||||
// We have "lost" the buffer since the lower level IO api
|
||||
// doesn't return the buffer on error. Allocate a new one.
|
||||
buf = Some(BytesMut::with_capacity(buf_size));
|
||||
buf = Some(IoBufferMut::with_capacity(buf_size));
|
||||
|
||||
continue;
|
||||
}
|
||||
@@ -1203,7 +1203,7 @@ impl DeltaLayerInner {
|
||||
.map(|x| x.0.get())
|
||||
.unwrap_or(8192);
|
||||
|
||||
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
|
||||
let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
|
||||
|
||||
// FIXME: buffering of DeltaLayerWriter
|
||||
let mut per_blob_copy = Vec::new();
|
||||
@@ -1561,12 +1561,11 @@ impl<'a> DeltaLayerIterator<'a> {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
|
||||
let mut next_batch = std::collections::VecDeque::new();
|
||||
let buf_size = plan.size();
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let buf = IoBufferMut::with_capacity(buf_size);
|
||||
let blobs_buf = vectored_blob_reader
|
||||
.read_blobs(&plan, buf, self.ctx)
|
||||
.await?;
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
let view = BufView::new_slice(&blobs_buf.buf);
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let blob_read = meta.read(&view).await?;
|
||||
let value = Value::des(&blob_read)?;
|
||||
@@ -1941,7 +1940,7 @@ pub(crate) mod test {
|
||||
&vectored_reads,
|
||||
constants::MAX_VECTORED_READ_BYTES,
|
||||
);
|
||||
let mut buf = Some(BytesMut::with_capacity(buf_size));
|
||||
let mut buf = Some(IoBufferMut::with_capacity(buf_size));
|
||||
|
||||
for read in vectored_reads {
|
||||
let blobs_buf = vectored_blob_reader
|
||||
|
||||
@@ -41,10 +41,11 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
@@ -547,10 +548,10 @@ impl ImageLayerInner {
|
||||
for read in plan.into_iter() {
|
||||
let buf_size = read.size();
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let buf = IoBufferMut::with_capacity(buf_size);
|
||||
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
|
||||
let view = BufView::new_slice(&blobs_buf.buf);
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = meta.read(&view).await?;
|
||||
@@ -609,13 +610,12 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let buf = IoBufferMut::with_capacity(buf_size);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
let view = BufView::new_slice(&blobs_buf.buf);
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = meta.read(&view).await;
|
||||
|
||||
@@ -1050,12 +1050,11 @@ impl<'a> ImageLayerIterator<'a> {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
|
||||
let mut next_batch = std::collections::VecDeque::new();
|
||||
let buf_size = plan.size();
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let buf = IoBufferMut::with_capacity(buf_size);
|
||||
let blobs_buf = vectored_blob_reader
|
||||
.read_blobs(&plan, buf, self.ctx)
|
||||
.await?;
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
let view = BufView::new_slice(&blobs_buf.buf);
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = meta.read(&view).await?;
|
||||
next_batch.push_back((
|
||||
|
||||
@@ -14,7 +14,6 @@ use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::{l0_flush, page_cache};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::CompactKey;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
@@ -809,9 +808,8 @@ impl InMemoryLayer {
|
||||
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::Direct { .. } => {
|
||||
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
|
||||
|
||||
let file_contents = Bytes::from(file_contents);
|
||||
let file_contents = inner.file.load_to_io_buf(ctx).await?;
|
||||
let file_contents = file_contents.freeze();
|
||||
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
// Write all page versions
|
||||
@@ -825,7 +823,7 @@ impl InMemoryLayer {
|
||||
len,
|
||||
will_init,
|
||||
} = entry;
|
||||
let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize);
|
||||
let buf = file_contents.slice(pos as usize..(pos + len) as usize);
|
||||
let (_buf, res) = delta_layer_writer
|
||||
.put_value_bytes(
|
||||
Key::from_compact(*key),
|
||||
|
||||
@@ -9,6 +9,7 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
|
||||
use crate::{
|
||||
assert_u64_eq_usize::{U64IsUsize, UsizeIsU64},
|
||||
context::RequestContext,
|
||||
virtual_file::{owned_buffers_io::io_buf_aligned::IoBufAlignedMut, IoBufferMut},
|
||||
};
|
||||
|
||||
/// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`].
|
||||
@@ -24,7 +25,7 @@ pub trait File: Send {
|
||||
/// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`.
|
||||
///
|
||||
/// No guarantees are made about the remaining bytes in `dst` in case of a short read.
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
|
||||
&'b self,
|
||||
start: u64,
|
||||
dst: Slice<B>,
|
||||
@@ -227,7 +228,7 @@ where
|
||||
|
||||
// Execute physical reads and fill the logical read buffers
|
||||
// TODO: pipelined reads; prefetch;
|
||||
let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE);
|
||||
let get_io_buffer = |nchunks| IoBufferMut::with_capacity(nchunks * DIO_CHUNK_SIZE);
|
||||
for PhysicalRead {
|
||||
start_chunk_no,
|
||||
nchunks,
|
||||
@@ -459,7 +460,7 @@ mod tests {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
let file = InMemoryFile::new_random(10);
|
||||
let test_read = |pos, len| {
|
||||
let buf = vec![0; len];
|
||||
let buf = IoBufferMut::with_capacity_zeroed(len);
|
||||
let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx);
|
||||
use futures::FutureExt;
|
||||
let (slice, nread) = fut
|
||||
@@ -470,9 +471,9 @@ mod tests {
|
||||
buf.truncate(nread);
|
||||
buf
|
||||
};
|
||||
assert_eq!(test_read(0, 1), &file.content[0..1]);
|
||||
assert_eq!(test_read(1, 2), &file.content[1..3]);
|
||||
assert_eq!(test_read(9, 2), &file.content[9..]);
|
||||
assert_eq!(&test_read(0, 1), &file.content[0..1]);
|
||||
assert_eq!(&test_read(1, 2), &file.content[1..3]);
|
||||
assert_eq!(&test_read(9, 2), &file.content[9..]);
|
||||
assert!(test_read(10, 2).is_empty());
|
||||
assert!(test_read(11, 2).is_empty());
|
||||
}
|
||||
@@ -609,7 +610,7 @@ mod tests {
|
||||
}
|
||||
|
||||
impl<'x> File for RecorderFile<'x> {
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
|
||||
&'b self,
|
||||
start: u64,
|
||||
dst: Slice<B>,
|
||||
@@ -782,7 +783,7 @@ mod tests {
|
||||
2048, 1024 => Err("foo".to_owned()),
|
||||
};
|
||||
|
||||
let buf = Vec::with_capacity(512);
|
||||
let buf = IoBufferMut::with_capacity(512);
|
||||
let (buf, nread) = mock_file
|
||||
.read_exact_at_eof_ok(0, buf.slice_full(), &ctx)
|
||||
.await
|
||||
@@ -790,7 +791,7 @@ mod tests {
|
||||
assert_eq!(nread, 512);
|
||||
assert_eq!(&buf.into_inner()[..nread], &[0; 512]);
|
||||
|
||||
let buf = Vec::with_capacity(512);
|
||||
let buf = IoBufferMut::with_capacity(512);
|
||||
let (buf, nread) = mock_file
|
||||
.read_exact_at_eof_ok(512, buf.slice_full(), &ctx)
|
||||
.await
|
||||
@@ -798,7 +799,7 @@ mod tests {
|
||||
assert_eq!(nread, 512);
|
||||
assert_eq!(&buf.into_inner()[..nread], &[1; 512]);
|
||||
|
||||
let buf = Vec::with_capacity(512);
|
||||
let buf = IoBufferMut::with_capacity(512);
|
||||
let (buf, nread) = mock_file
|
||||
.read_exact_at_eof_ok(1024, buf.slice_full(), &ctx)
|
||||
.await
|
||||
@@ -806,7 +807,7 @@ mod tests {
|
||||
assert_eq!(nread, 10);
|
||||
assert_eq!(&buf.into_inner()[..nread], &[2; 10]);
|
||||
|
||||
let buf = Vec::with_capacity(1024);
|
||||
let buf = IoBufferMut::with_capacity(1024);
|
||||
let err = mock_file
|
||||
.read_exact_at_eof_ok(2048, buf.slice_full(), &ctx)
|
||||
.await
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Deref;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::Key;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
@@ -27,6 +27,7 @@ use utils::vec_map::VecMap;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
|
||||
/// Metadata bundled with the start and end offset of a blob.
|
||||
@@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob {
|
||||
/// Return type of [`VectoredBlobReader::read_blobs`]
|
||||
pub struct VectoredBlobsBuf {
|
||||
/// Buffer for all blobs in this read
|
||||
pub buf: BytesMut,
|
||||
pub buf: IoBufferMut,
|
||||
/// Offsets into the buffer and metadata for all blobs in this read
|
||||
pub blobs: Vec<VectoredBlob>,
|
||||
}
|
||||
@@ -441,7 +442,7 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
pub async fn read_blobs(
|
||||
&self,
|
||||
read: &VectoredRead,
|
||||
buf: BytesMut,
|
||||
buf: IoBufferMut,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VectoredBlobsBuf, std::io::Error> {
|
||||
assert!(read.size() > 0);
|
||||
@@ -916,7 +917,7 @@ mod tests {
|
||||
|
||||
// Multiply by two (compressed data might need more space), and add a few bytes for the header
|
||||
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
|
||||
let mut buf = BytesMut::with_capacity(reserved_bytes);
|
||||
let mut buf = IoBufferMut::with_capacity(reserved_bytes);
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&file);
|
||||
let meta = BlobMeta {
|
||||
|
||||
@@ -18,6 +18,9 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ};
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
|
||||
use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
|
||||
use owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -55,6 +58,8 @@ pub(crate) mod owned_buffers_io {
|
||||
//! but for the time being we're proving out the primitives in the neon.git repo
|
||||
//! for faster iteration.
|
||||
|
||||
pub(crate) mod aligned_buffer;
|
||||
pub(crate) mod io_buf_aligned;
|
||||
pub(crate) mod io_buf_ext;
|
||||
pub(crate) mod slice;
|
||||
pub(crate) mod write;
|
||||
@@ -196,7 +201,7 @@ impl VirtualFile {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Slice<Buf>, Error>
|
||||
where
|
||||
Buf: IoBufMut + Send,
|
||||
Buf: IoBufAlignedMut + Send,
|
||||
{
|
||||
self.inner.read_exact_at(slice, offset, ctx).await
|
||||
}
|
||||
@@ -771,7 +776,7 @@ impl VirtualFileInner {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Slice<Buf>, Error>
|
||||
where
|
||||
Buf: IoBufMut + Send,
|
||||
Buf: IoBufAlignedMut + Send,
|
||||
{
|
||||
let assert_we_return_original_bounds = if cfg!(debug_assertions) {
|
||||
Some((slice.stable_ptr() as usize, slice.bytes_total()))
|
||||
@@ -1222,12 +1227,14 @@ impl VirtualFileInner {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let slice = Vec::with_capacity(PAGE_SZ).slice_full();
|
||||
let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
|
||||
assert_eq!(slice.bytes_total(), PAGE_SZ);
|
||||
let slice = self
|
||||
.read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
|
||||
.await?;
|
||||
Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
|
||||
Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
|
||||
slice.into_inner(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
@@ -1325,10 +1332,11 @@ impl OpenFiles {
|
||||
/// server startup.
|
||||
///
|
||||
#[cfg(not(test))]
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind) {
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
set_io_mode(mode);
|
||||
io_engine::init(engine);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
@@ -1357,6 +1365,11 @@ pub(crate) const fn get_io_buffer_alignment() -> usize {
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT
|
||||
}
|
||||
|
||||
pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
|
||||
pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
|
||||
pub(crate) type IoPageSlice<'a> =
|
||||
AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
|
||||
|
||||
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
|
||||
|
||||
pub(crate) fn set_io_mode(mode: IoMode) {
|
||||
@@ -1395,10 +1408,10 @@ mod tests {
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(
|
||||
&self,
|
||||
mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
|
||||
mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
|
||||
) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
|
||||
MaybeVirtualFile::File(file) => {
|
||||
@@ -1466,12 +1479,13 @@ mod tests {
|
||||
len: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<String, Error> {
|
||||
let slice = Vec::with_capacity(len).slice_full();
|
||||
let slice = IoBufferMut::with_capacity(len).slice_full();
|
||||
assert_eq!(slice.bytes_total(), len);
|
||||
let slice = self.read_exact_at(slice, pos, ctx).await?;
|
||||
let vec = slice.into_inner();
|
||||
assert_eq!(vec.len(), len);
|
||||
Ok(String::from_utf8(vec).unwrap())
|
||||
let buf = slice.into_inner();
|
||||
assert_eq!(buf.len(), len);
|
||||
|
||||
Ok(String::from_utf8(buf.to_vec()).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1695,7 +1709,7 @@ mod tests {
|
||||
let files = files.clone();
|
||||
let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
let hdl = rt.spawn(async move {
|
||||
let mut buf = vec![0u8; SIZE];
|
||||
let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
@@ -1704,7 +1718,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
assert!(buf == SAMPLE);
|
||||
assert!(buf[..] == SAMPLE);
|
||||
}
|
||||
});
|
||||
hdls.push(hdl);
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
pub mod alignment;
|
||||
pub mod buffer;
|
||||
pub mod buffer_mut;
|
||||
pub mod raw;
|
||||
pub mod slice;
|
||||
|
||||
pub use alignment::*;
|
||||
pub use buffer_mut::AlignedBufferMut;
|
||||
pub use slice::AlignedSlice;
|
||||
@@ -0,0 +1,26 @@
|
||||
pub trait Alignment: std::marker::Unpin + 'static {
|
||||
/// Returns the required alignments.
|
||||
fn align(&self) -> usize;
|
||||
}
|
||||
|
||||
/// Alignment at compile time.
|
||||
#[derive(Debug)]
|
||||
pub struct ConstAlign<const A: usize>;
|
||||
|
||||
impl<const A: usize> Alignment for ConstAlign<A> {
|
||||
fn align(&self) -> usize {
|
||||
A
|
||||
}
|
||||
}
|
||||
|
||||
/// Alignment at run time.
|
||||
#[derive(Debug)]
|
||||
pub struct RuntimeAlign {
|
||||
align: usize,
|
||||
}
|
||||
|
||||
impl Alignment for RuntimeAlign {
|
||||
fn align(&self) -> usize {
|
||||
self.align
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
use std::{
|
||||
ops::{Deref, Range, RangeBounds},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use super::{alignment::Alignment, raw::RawAlignedBuffer};
|
||||
|
||||
/// An shared, immutable aligned buffer type.
|
||||
pub struct AlignedBuffer<A: Alignment> {
|
||||
/// Shared raw buffer.
|
||||
raw: Arc<RawAlignedBuffer<A>>,
|
||||
/// Range that specifies the current slice.
|
||||
range: Range<usize>,
|
||||
}
|
||||
|
||||
impl<A: Alignment> AlignedBuffer<A> {
|
||||
/// Creates an immutable `IoBuffer` from the raw buffer
|
||||
pub(super) fn from_raw(raw: RawAlignedBuffer<A>, range: Range<usize>) -> Self {
|
||||
AlignedBuffer {
|
||||
raw: Arc::new(raw),
|
||||
range,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.range.len()
|
||||
}
|
||||
|
||||
/// Returns the alignment of the buffer.
|
||||
#[inline]
|
||||
pub fn align(&self) -> usize {
|
||||
self.raw.align()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn as_ptr(&self) -> *const u8 {
|
||||
// SAFETY: `self.range.start` is guaranteed to be within [0, self.len()).
|
||||
unsafe { self.raw.as_ptr().add(self.range.start) }
|
||||
}
|
||||
|
||||
/// Extracts a slice containing the entire buffer.
|
||||
///
|
||||
/// Equivalent to `&s[..]`.
|
||||
#[inline]
|
||||
fn as_slice(&self) -> &[u8] {
|
||||
&self.raw.as_slice()[self.range.start..self.range.end]
|
||||
}
|
||||
|
||||
/// Returns a slice of self for the index range `[begin..end)`.
|
||||
pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
|
||||
use core::ops::Bound;
|
||||
let len = self.len();
|
||||
|
||||
let begin = match range.start_bound() {
|
||||
Bound::Included(&n) => n,
|
||||
Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
|
||||
Bound::Unbounded => 0,
|
||||
};
|
||||
|
||||
let end = match range.end_bound() {
|
||||
Bound::Included(&n) => n.checked_add(1).expect("out of range"),
|
||||
Bound::Excluded(&n) => n,
|
||||
Bound::Unbounded => len,
|
||||
};
|
||||
|
||||
assert!(
|
||||
begin <= end,
|
||||
"range start must not be greater than end: {:?} <= {:?}",
|
||||
begin,
|
||||
end,
|
||||
);
|
||||
assert!(
|
||||
end <= len,
|
||||
"range end out of bounds: {:?} <= {:?}",
|
||||
end,
|
||||
len,
|
||||
);
|
||||
|
||||
let begin = self.range.start + begin;
|
||||
let end = self.range.start + end;
|
||||
|
||||
AlignedBuffer {
|
||||
raw: Arc::clone(&self.raw),
|
||||
range: begin..end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> Deref for AlignedBuffer<A> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.as_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> AsRef<[u8]> for AlignedBuffer<A> {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.as_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> PartialEq<[u8]> for AlignedBuffer<A> {
|
||||
fn eq(&self, other: &[u8]) -> bool {
|
||||
self.as_slice().eq(other)
|
||||
}
|
||||
}
|
||||
|
||||
/// SAFETY: the underlying buffer references a stable memory region.
|
||||
unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBuffer<A> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.as_ptr()
|
||||
}
|
||||
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,347 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use super::{
|
||||
alignment::{Alignment, ConstAlign},
|
||||
buffer::AlignedBuffer,
|
||||
raw::RawAlignedBuffer,
|
||||
};
|
||||
|
||||
/// A mutable aligned buffer type.
|
||||
#[derive(Debug)]
|
||||
pub struct AlignedBufferMut<A: Alignment> {
|
||||
raw: RawAlignedBuffer<A>,
|
||||
}
|
||||
|
||||
impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
|
||||
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
|
||||
///
|
||||
/// The buffer will be able to hold at most `capacity` elements and will never resize.
|
||||
///
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
|
||||
/// * `align` must not be zero,
|
||||
///
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
AlignedBufferMut {
|
||||
raw: RawAlignedBuffer::with_capacity(capacity),
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
|
||||
pub fn with_capacity_zeroed(capacity: usize) -> Self {
|
||||
use bytes::BufMut;
|
||||
let mut buf = Self::with_capacity(capacity);
|
||||
buf.put_bytes(0, capacity);
|
||||
// SAFETY: `put_bytes` filled the entire buffer.
|
||||
unsafe { buf.set_len(capacity) };
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> AlignedBufferMut<A> {
|
||||
/// Returns the total number of bytes the buffer can hold.
|
||||
#[inline]
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.raw.capacity()
|
||||
}
|
||||
|
||||
/// Returns the alignment of the buffer.
|
||||
#[inline]
|
||||
pub fn align(&self) -> usize {
|
||||
self.raw.align()
|
||||
}
|
||||
|
||||
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.raw.len()
|
||||
}
|
||||
|
||||
/// Force the length of the buffer to `new_len`.
|
||||
#[inline]
|
||||
unsafe fn set_len(&mut self, new_len: usize) {
|
||||
self.raw.set_len(new_len)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn as_ptr(&self) -> *const u8 {
|
||||
self.raw.as_ptr()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn as_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.raw.as_mut_ptr()
|
||||
}
|
||||
|
||||
/// Extracts a slice containing the entire buffer.
|
||||
///
|
||||
/// Equivalent to `&s[..]`.
|
||||
#[inline]
|
||||
fn as_slice(&self) -> &[u8] {
|
||||
self.raw.as_slice()
|
||||
}
|
||||
|
||||
/// Extracts a mutable slice of the entire buffer.
|
||||
///
|
||||
/// Equivalent to `&mut s[..]`.
|
||||
fn as_mut_slice(&mut self) -> &mut [u8] {
|
||||
self.raw.as_mut_slice()
|
||||
}
|
||||
|
||||
/// Drops the all the contents of the buffer, setting its length to `0`.
|
||||
#[inline]
|
||||
pub fn clear(&mut self) {
|
||||
self.raw.clear()
|
||||
}
|
||||
|
||||
/// Reserves capacity for at least `additional` more bytes to be inserted
|
||||
/// in the given `IoBufferMut`. The collection may reserve more space to
|
||||
/// speculatively avoid frequent reallocations. After calling `reserve`,
|
||||
/// capacity will be greater than or equal to `self.len() + additional`.
|
||||
/// Does nothing if capacity is already sufficient.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
|
||||
pub fn reserve(&mut self, additional: usize) {
|
||||
self.raw.reserve(additional);
|
||||
}
|
||||
|
||||
/// Shortens the buffer, keeping the first len bytes.
|
||||
pub fn truncate(&mut self, len: usize) {
|
||||
self.raw.truncate(len);
|
||||
}
|
||||
|
||||
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
|
||||
pub fn leak<'a>(self) -> &'a mut [u8] {
|
||||
self.raw.leak()
|
||||
}
|
||||
|
||||
pub fn freeze(self) -> AlignedBuffer<A> {
|
||||
let len = self.len();
|
||||
AlignedBuffer::from_raw(self.raw, 0..len)
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> Deref for AlignedBufferMut<A> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.as_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.as_mut_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.as_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
|
||||
fn as_mut(&mut self) -> &mut [u8] {
|
||||
self.as_mut_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
|
||||
fn eq(&self, other: &[u8]) -> bool {
|
||||
self.as_slice().eq(other)
|
||||
}
|
||||
}
|
||||
|
||||
/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
|
||||
unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
|
||||
#[inline]
|
||||
fn remaining_mut(&self) -> usize {
|
||||
// Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
|
||||
// Thus, it can have at most `self.capacity` bytes.
|
||||
self.capacity() - self.len()
|
||||
}
|
||||
|
||||
// SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
|
||||
#[inline]
|
||||
unsafe fn advance_mut(&mut self, cnt: usize) {
|
||||
let len = self.len();
|
||||
let remaining = self.remaining_mut();
|
||||
|
||||
if remaining < cnt {
|
||||
panic_advance(cnt, remaining);
|
||||
}
|
||||
|
||||
// Addition will not overflow since the sum is at most the capacity.
|
||||
self.set_len(len + cnt);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
|
||||
let cap = self.capacity();
|
||||
let len = self.len();
|
||||
|
||||
// SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
|
||||
// valid for `cap - len` bytes. The subtraction will not underflow since
|
||||
// `len <= cap`.
|
||||
unsafe {
|
||||
bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Panic with a nice error message.
|
||||
#[cold]
|
||||
fn panic_advance(idx: usize, len: usize) -> ! {
|
||||
panic!(
|
||||
"advance out of bounds: the len is {} but advancing by {}",
|
||||
len, idx
|
||||
);
|
||||
}
|
||||
|
||||
/// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
|
||||
/// and the underlying pointer remains stable while io-uring is owning the buffer.
|
||||
/// The tokio-epoll-uring crate itself will not resize the buffer and will respect
|
||||
/// [`tokio_epoll_uring::IoBuf::bytes_total`].
|
||||
unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.as_ptr()
|
||||
}
|
||||
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.capacity()
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: See above.
|
||||
unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, init_len: usize) {
|
||||
if self.len() < init_len {
|
||||
self.set_len(init_len);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
const ALIGN: usize = 4 * 1024;
|
||||
type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
|
||||
|
||||
#[test]
|
||||
fn test_with_capacity() {
|
||||
let v = TestIoBufferMut::with_capacity(ALIGN * 4);
|
||||
assert_eq!(v.len(), 0);
|
||||
assert_eq!(v.capacity(), ALIGN * 4);
|
||||
assert_eq!(v.align(), ALIGN);
|
||||
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
|
||||
|
||||
let v = TestIoBufferMut::with_capacity(ALIGN / 2);
|
||||
assert_eq!(v.len(), 0);
|
||||
assert_eq!(v.capacity(), ALIGN / 2);
|
||||
assert_eq!(v.align(), ALIGN);
|
||||
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_capacity_zeroed() {
|
||||
let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
|
||||
assert_eq!(v.len(), ALIGN);
|
||||
assert_eq!(v.capacity(), ALIGN);
|
||||
assert_eq!(v.align(), ALIGN);
|
||||
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
|
||||
assert_eq!(&v[..], &[0; ALIGN])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reserve() {
|
||||
use bytes::BufMut;
|
||||
let mut v = TestIoBufferMut::with_capacity(ALIGN);
|
||||
let capacity = v.capacity();
|
||||
v.reserve(capacity);
|
||||
assert_eq!(v.capacity(), capacity);
|
||||
let data = [b'a'; ALIGN];
|
||||
v.put(&data[..]);
|
||||
v.reserve(capacity);
|
||||
assert!(v.capacity() >= capacity * 2);
|
||||
assert_eq!(&v[..], &data[..]);
|
||||
let capacity = v.capacity();
|
||||
v.clear();
|
||||
v.reserve(capacity);
|
||||
assert_eq!(capacity, v.capacity());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_put() {
|
||||
use bytes::BufMut;
|
||||
let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
|
||||
let x = [b'a'; ALIGN];
|
||||
|
||||
for _ in 0..2 {
|
||||
for _ in 0..4 {
|
||||
v.put(&x[..]);
|
||||
}
|
||||
assert_eq!(v.len(), ALIGN * 4);
|
||||
assert_eq!(v.capacity(), ALIGN * 4);
|
||||
assert_eq!(v.align(), ALIGN);
|
||||
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
|
||||
v.clear()
|
||||
}
|
||||
assert_eq!(v.len(), 0);
|
||||
assert_eq!(v.capacity(), ALIGN * 4);
|
||||
assert_eq!(v.align(), ALIGN);
|
||||
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_bytes_put_panic() {
|
||||
use bytes::BufMut;
|
||||
const ALIGN: usize = 4 * 1024;
|
||||
let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
|
||||
let x = [b'a'; ALIGN];
|
||||
for _ in 0..5 {
|
||||
v.put_slice(&x[..]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_buf_put_slice() {
|
||||
use tokio_epoll_uring::BoundedBufMut;
|
||||
const ALIGN: usize = 4 * 1024;
|
||||
let mut v = TestIoBufferMut::with_capacity(ALIGN);
|
||||
let x = [b'a'; ALIGN];
|
||||
|
||||
for _ in 0..2 {
|
||||
v.put_slice(&x[..]);
|
||||
assert_eq!(v.len(), ALIGN);
|
||||
assert_eq!(v.capacity(), ALIGN);
|
||||
assert_eq!(v.align(), ALIGN);
|
||||
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
|
||||
v.clear()
|
||||
}
|
||||
assert_eq!(v.len(), 0);
|
||||
assert_eq!(v.capacity(), ALIGN);
|
||||
assert_eq!(v.align(), ALIGN);
|
||||
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,216 @@
|
||||
use core::slice;
|
||||
use std::{
|
||||
alloc::{self, Layout},
|
||||
cmp,
|
||||
mem::ManuallyDrop,
|
||||
};
|
||||
|
||||
use super::alignment::{Alignment, ConstAlign};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AlignedBufferPtr(*mut u8);
|
||||
|
||||
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
|
||||
unsafe impl Send for AlignedBufferPtr {}
|
||||
|
||||
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
|
||||
unsafe impl Sync for AlignedBufferPtr {}
|
||||
|
||||
/// An aligned buffer type.
|
||||
#[derive(Debug)]
|
||||
pub struct RawAlignedBuffer<A: Alignment> {
|
||||
ptr: AlignedBufferPtr,
|
||||
capacity: usize,
|
||||
len: usize,
|
||||
align: A,
|
||||
}
|
||||
|
||||
impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
|
||||
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
|
||||
///
|
||||
/// The buffer will be able to hold at most `capacity` elements and will never resize.
|
||||
///
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
|
||||
/// * `align` must not be zero,
|
||||
///
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
let align = ConstAlign::<A>;
|
||||
let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout");
|
||||
|
||||
// SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout.
|
||||
let ptr = unsafe {
|
||||
let ptr = alloc::alloc(layout);
|
||||
if ptr.is_null() {
|
||||
alloc::handle_alloc_error(layout);
|
||||
}
|
||||
AlignedBufferPtr(ptr)
|
||||
};
|
||||
|
||||
RawAlignedBuffer {
|
||||
ptr,
|
||||
capacity,
|
||||
len: 0,
|
||||
align,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> RawAlignedBuffer<A> {
|
||||
/// Returns the total number of bytes the buffer can hold.
|
||||
#[inline]
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.capacity
|
||||
}
|
||||
|
||||
/// Returns the alignment of the buffer.
|
||||
#[inline]
|
||||
pub fn align(&self) -> usize {
|
||||
self.align.align()
|
||||
}
|
||||
|
||||
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.len
|
||||
}
|
||||
|
||||
/// Force the length of the buffer to `new_len`.
|
||||
#[inline]
|
||||
pub unsafe fn set_len(&mut self, new_len: usize) {
|
||||
debug_assert!(new_len <= self.capacity());
|
||||
self.len = new_len;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn as_ptr(&self) -> *const u8 {
|
||||
self.ptr.0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn as_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.ptr.0
|
||||
}
|
||||
|
||||
/// Extracts a slice containing the entire buffer.
|
||||
///
|
||||
/// Equivalent to `&s[..]`.
|
||||
#[inline]
|
||||
pub fn as_slice(&self) -> &[u8] {
|
||||
// SAFETY: The pointer is valid and `len` bytes are initialized.
|
||||
unsafe { slice::from_raw_parts(self.as_ptr(), self.len) }
|
||||
}
|
||||
|
||||
/// Extracts a mutable slice of the entire buffer.
|
||||
///
|
||||
/// Equivalent to `&mut s[..]`.
|
||||
pub fn as_mut_slice(&mut self) -> &mut [u8] {
|
||||
// SAFETY: The pointer is valid and `len` bytes are initialized.
|
||||
unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
|
||||
}
|
||||
|
||||
/// Drops the all the contents of the buffer, setting its length to `0`.
|
||||
#[inline]
|
||||
pub fn clear(&mut self) {
|
||||
self.len = 0;
|
||||
}
|
||||
|
||||
/// Reserves capacity for at least `additional` more bytes to be inserted
|
||||
/// in the given `IoBufferMut`. The collection may reserve more space to
|
||||
/// speculatively avoid frequent reallocations. After calling `reserve`,
|
||||
/// capacity will be greater than or equal to `self.len() + additional`.
|
||||
/// Does nothing if capacity is already sufficient.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
|
||||
pub fn reserve(&mut self, additional: usize) {
|
||||
if additional > self.capacity() - self.len() {
|
||||
self.reserve_inner(additional);
|
||||
}
|
||||
}
|
||||
|
||||
fn reserve_inner(&mut self, additional: usize) {
|
||||
let Some(required_cap) = self.len().checked_add(additional) else {
|
||||
capacity_overflow()
|
||||
};
|
||||
|
||||
let old_capacity = self.capacity();
|
||||
let align = self.align();
|
||||
// This guarantees exponential growth. The doubling cannot overflow
|
||||
// because `cap <= isize::MAX` and the type of `cap` is `usize`.
|
||||
let cap = cmp::max(old_capacity * 2, required_cap);
|
||||
|
||||
if !is_valid_alloc(cap) {
|
||||
capacity_overflow()
|
||||
}
|
||||
let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout");
|
||||
|
||||
let old_ptr = self.as_mut_ptr();
|
||||
|
||||
// SAFETY: old allocation was allocated with std::alloc::alloc with the same layout,
|
||||
// and we panics on null pointer.
|
||||
let (ptr, cap) = unsafe {
|
||||
let old_layout = Layout::from_size_align_unchecked(old_capacity, align);
|
||||
let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size());
|
||||
if ptr.is_null() {
|
||||
alloc::handle_alloc_error(new_layout);
|
||||
}
|
||||
(AlignedBufferPtr(ptr), cap)
|
||||
};
|
||||
|
||||
self.ptr = ptr;
|
||||
self.capacity = cap;
|
||||
}
|
||||
|
||||
/// Shortens the buffer, keeping the first len bytes.
|
||||
pub fn truncate(&mut self, len: usize) {
|
||||
if len > self.len {
|
||||
return;
|
||||
}
|
||||
self.len = len;
|
||||
}
|
||||
|
||||
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
|
||||
pub fn leak<'a>(self) -> &'a mut [u8] {
|
||||
let mut buf = ManuallyDrop::new(self);
|
||||
// SAFETY: leaking the buffer as intended.
|
||||
unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) }
|
||||
}
|
||||
}
|
||||
|
||||
fn capacity_overflow() -> ! {
|
||||
panic!("capacity overflow")
|
||||
}
|
||||
|
||||
// We need to guarantee the following:
|
||||
// * We don't ever allocate `> isize::MAX` byte-size objects.
|
||||
// * We don't overflow `usize::MAX` and actually allocate too little.
|
||||
//
|
||||
// On 64-bit we just need to check for overflow since trying to allocate
|
||||
// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add
|
||||
// an extra guard for this in case we're running on a platform which can use
|
||||
// all 4GB in user-space, e.g., PAE or x32.
|
||||
#[inline]
|
||||
fn is_valid_alloc(alloc_size: usize) -> bool {
|
||||
!(usize::BITS < 64 && alloc_size > isize::MAX as usize)
|
||||
}
|
||||
|
||||
impl<A: Alignment> Drop for RawAlignedBuffer<A> {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: memory was allocated with std::alloc::alloc with the same layout.
|
||||
unsafe {
|
||||
alloc::dealloc(
|
||||
self.as_mut_ptr(),
|
||||
Layout::from_size_align_unchecked(self.capacity, self.align.align()),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use super::alignment::{Alignment, ConstAlign};
|
||||
|
||||
/// Newtype for an aligned slice.
|
||||
pub struct AlignedSlice<'a, const N: usize, A: Alignment> {
|
||||
/// underlying byte slice
|
||||
buf: &'a mut [u8; N],
|
||||
/// alignment marker
|
||||
_align: A,
|
||||
}
|
||||
|
||||
impl<'a, const N: usize, const A: usize> AlignedSlice<'a, N, ConstAlign<A>> {
|
||||
/// Create a new aligned slice from a mutable byte slice. The input must already satisify the alignment.
|
||||
pub unsafe fn new_unchecked(buf: &'a mut [u8; N]) -> Self {
|
||||
let _align = ConstAlign::<A>;
|
||||
assert_eq!(buf.as_ptr().align_offset(_align.align()), 0);
|
||||
AlignedSlice { buf, _align }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, const N: usize, A: Alignment> Deref for AlignedSlice<'a, N, A> {
|
||||
type Target = [u8; N];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, const N: usize, A: Alignment> DerefMut for AlignedSlice<'a, N, A> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, const N: usize, A: Alignment> AsRef<[u8; N]> for AlignedSlice<'a, N, A> {
|
||||
fn as_ref(&self) -> &[u8; N] {
|
||||
self.buf
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
|
||||
use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf};
|
||||
|
||||
pub trait IoBufAlignedMut: IoBufMut {}
|
||||
|
||||
impl IoBufAlignedMut for IoBufferMut {}
|
||||
|
||||
impl IoBufAlignedMut for PageWriteGuardBuf {}
|
||||
@@ -1,5 +1,6 @@
|
||||
//! See [`FullSlice`].
|
||||
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use std::ops::{Deref, Range};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
@@ -76,3 +77,5 @@ macro_rules! impl_io_buf_ext {
|
||||
impl_io_buf_ext!(Bytes);
|
||||
impl_io_buf_ext!(BytesMut);
|
||||
impl_io_buf_ext!(Vec<u8>);
|
||||
impl_io_buf_ext!(IoBufferMut);
|
||||
impl_io_buf_ext!(IoBuffer);
|
||||
|
||||
Reference in New Issue
Block a user