Compare commits

...

9 Commits

Author SHA1 Message Date
Yuchen Liang
3ee98775df pageserver: implement aligned io buffer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-07 15:57:49 -04:00
Yuchen Liang
f1418cad52 Merge branch 'main' into yuchen/virtual-file-config 2024-10-07 15:15:26 -04:00
Yuchen Liang
a04cfd754b get rid of io_buffer_alignment config (always 512)
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-07 12:16:11 -04:00
Yuchen Liang
bc13310e56 Merge branch 'main' into yuchen/virtual-file-config 2024-10-07 11:49:13 -04:00
Yuchen Liang
5c76b2d474 fix put_io_mode to use the correct http endpoint
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 10:58:47 -04:00
Yuchen Liang
97f7b0b86f simplify virtual file wrapper
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 08:31:30 -04:00
Yuchen Liang
3a5b44ea53 add set_io_mode option to getpage_latest_lsn
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 08:16:18 -04:00
Yuchen Liang
95554c7377 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 07:59:15 -04:00
Yuchen Liang
a85bd88866 pageserver: add direct io config to virtual file
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-30 23:54:14 -04:00
21 changed files with 760 additions and 250 deletions

View File

@@ -104,8 +104,7 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -388,10 +387,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,
virtual_file_io_mode: None,
tenant_config: TenantConfigToml::default(),
}
}

View File

@@ -972,8 +972,6 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::path::PathBuf;
#[derive(
Copy,
Clone,
@@ -994,50 +992,45 @@ pub mod virtual_file {
}
/// Direct IO modes for a pageserver.
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum DirectIoMode {
/// Direct IO disabled (uses usual buffered IO).
#[default]
Disabled,
/// Direct IO disabled (performs checks and perf simulations).
Evaluate {
/// Alignment check level
alignment_check: DirectIoAlignmentCheckLevel,
/// Latency padded for performance simulation.
latency_padding: DirectIoLatencyPadding,
},
/// Direct IO enabled.
Enabled {
/// Actions to perform on alignment error.
on_alignment_error: DirectIoOnAlignmentErrorAction,
},
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO, error out if the operation fails.
#[cfg(target_os = "linux")]
Direct,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoAlignmentCheckLevel {
#[default]
Error,
Log,
None,
impl IoMode {
pub const fn preferred() -> Self {
Self::Buffered
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoOnAlignmentErrorAction {
Error,
#[default]
FallbackToBuffered,
}
impl TryFrom<u8> for IoMode {
type Error = u8;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum DirectIoLatencyPadding {
/// Pad virtual file operations with IO to a fake file.
FakeFileRW { path: PathBuf },
#[default]
None,
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
x => return Err(x),
})
}
}
}

View File

@@ -164,11 +164,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
virtual_file::init(16384, virtual_file::io_engine_for_bench());
page_cache::init(conf.page_cache_size);
{

View File

@@ -540,10 +540,13 @@ impl Client {
.map_err(Error::ReceiveBody)
}
/// Configs io buffer alignment at runtime.
pub async fn put_io_alignment(&self, align: usize) -> Result<()> {
let uri = format!("{}/v1/io_alignment", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, align)
/// Configs io mode at runtime.
pub async fn put_io_mode(
&self,
mode: &pageserver_api::models::virtual_file::IoMode,
) -> Result<()> {
let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, mode)
.await?
.json()
.await

View File

@@ -152,11 +152,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
@@ -190,11 +190,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -26,7 +26,7 @@ use pageserver::{
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use pageserver_api::{config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, shard::TenantShardId};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::ControlFileData;
use remote_storage::{RemotePath, RemoteStorageConfig};
use tokio_util::sync::CancellationToken;
@@ -205,11 +205,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -59,9 +59,9 @@ pub(crate) struct Args {
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
/// Before starting the benchmark, live-reconfigure the pageserver to use specified alignment for io buffers.
/// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
#[clap(long)]
set_io_alignment: Option<usize>,
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -129,8 +129,8 @@ async fn main_impl(
mgmt_api_client.put_io_engine(engine_str).await?;
}
if let Some(align) = args.set_io_alignment {
mgmt_api_client.put_io_alignment(align).await?;
if let Some(mode) = &args.set_io_mode {
mgmt_api_client.put_io_mode(mode).await?;
}
// discover targets

View File

@@ -125,8 +125,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
// The tenants directory contains all the pageserver local disk state.
// Create if not exists and make sure all the contents are durable before proceeding.
@@ -168,11 +167,7 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(
conf.max_file_descriptors,
conf.virtual_file_io_engine,
conf.io_buffer_alignment,
);
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -174,9 +174,7 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,
/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
pub virtual_file_io_mode: virtual_file::IoMode,
}
/// Token for authentication to safekeepers
@@ -325,11 +323,10 @@ impl PageServerConf {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_direct_io,
virtual_file_io_mode,
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
io_buffer_alignment,
tenant_config,
} = config_toml;
@@ -368,8 +365,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
virtual_file_direct_io,
io_buffer_alignment,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -408,6 +403,7 @@ impl PageServerConf {
l0_flush: l0_flush
.map(crate::l0_flush::L0FlushConfig::from)
.unwrap_or_default(),
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
};
// ------------------------------------------------------------

View File

@@ -17,6 +17,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
@@ -2379,17 +2380,13 @@ async fn put_io_engine_handler(
json_response(StatusCode::OK, ())
}
async fn put_io_alignment_handler(
async fn put_io_mode_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let align: usize = json_request(&mut r).await?;
crate::virtual_file::set_io_buffer_alignment(align).map_err(|align| {
ApiError::PreconditionFailed(
format!("Requested io alignment ({align}) is not a power of two").into(),
)
})?;
let mode: IoMode = json_request(&mut r).await?;
crate::virtual_file::set_io_mode(mode);
json_response(StatusCode::OK, ())
}
@@ -3080,9 +3077,7 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_alignment", |r| {
api_handler(r, put_io_alignment_handler)
})
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),

View File

@@ -84,7 +84,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let path = self.buffered_writer.as_inner().as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
@@ -356,7 +356,7 @@ mod tests {
}
let file_contents =
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let buffer_contents = file.buffered_writer.inspect_buffer();
@@ -392,7 +392,7 @@ mod tests {
.buffered_writer
.as_inner()
.as_inner()
.path
.path()
.metadata()
.unwrap();
assert_eq!(

View File

@@ -573,7 +573,7 @@ impl DeltaLayerWriterInner {
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path,
file.path(),
metadata.len()
);
@@ -791,7 +791,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
@@ -1022,7 +1022,7 @@ impl DeltaLayerInner {
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);
@@ -1048,7 +1048,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1066,7 +1066,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1198,7 +1198,6 @@ impl DeltaLayerInner {
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
let align = virtual_file::get_io_buffer_alignment();
let max_read_size = self
.max_vectored_read_bytes
@@ -1247,7 +1246,6 @@ impl DeltaLayerInner {
offsets.end.pos(),
meta,
max_read_size,
align,
))
}
} else {

View File

@@ -389,7 +389,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
let file_id = page_cache::next_file_id();
@@ -626,7 +626,7 @@ impl ImageLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -647,7 +647,7 @@ impl ImageLayerInner {
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);

View File

@@ -194,8 +194,6 @@ pub(crate) struct ChunkedVectoredReadBuilder {
/// Start offset and metadata for each blob in this read
blobs_at: VecMap<u64, BlobMeta>,
max_read_size: Option<usize>,
/// Chunk size reads are coalesced into.
chunk_size: usize,
}
/// Computes x / d rounded up.
@@ -204,6 +202,7 @@ fn div_round_up(x: usize, d: usize) -> usize {
}
impl ChunkedVectoredReadBuilder {
const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
/// Start building a new vectored read.
///
/// Note that by design, this does not check against reading more than `max_read_size` to
@@ -214,21 +213,19 @@ impl ChunkedVectoredReadBuilder {
end_offset: u64,
meta: BlobMeta,
max_read_size: Option<usize>,
chunk_size: usize,
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
.expect("First insertion always succeeds");
let start_blk_no = start_offset as usize / chunk_size;
let end_blk_no = div_round_up(end_offset as usize, chunk_size);
let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
let end_blk_no = div_round_up(end_offset as usize, Self::CHUNK_SIZE);
Self {
start_blk_no,
end_blk_no,
blobs_at,
max_read_size,
chunk_size,
}
}
@@ -237,18 +234,12 @@ impl ChunkedVectoredReadBuilder {
end_offset: u64,
meta: BlobMeta,
max_read_size: usize,
align: usize,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), align)
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
}
pub(crate) fn new_streaming(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
align: usize,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, None, align)
pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
Self::new_impl(start_offset, end_offset, meta, None)
}
/// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
@@ -256,12 +247,12 @@ impl ChunkedVectoredReadBuilder {
/// The resulting size also must be below the max read size.
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
tracing::trace!(start, end, "trying to extend");
let start_blk_no = start as usize / self.chunk_size;
let end_blk_no = div_round_up(end as usize, self.chunk_size);
let start_blk_no = start as usize / Self::CHUNK_SIZE;
let end_blk_no = div_round_up(end as usize, Self::CHUNK_SIZE);
let not_limited_by_max_read_size = {
if let Some(max_read_size) = self.max_read_size {
let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
coalesced_size <= max_read_size
} else {
true
@@ -292,12 +283,12 @@ impl ChunkedVectoredReadBuilder {
}
pub(crate) fn size(&self) -> usize {
(self.end_blk_no - self.start_blk_no) * self.chunk_size
(self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
}
pub(crate) fn build(self) -> VectoredRead {
let start = (self.start_blk_no * self.chunk_size) as u64;
let end = (self.end_blk_no * self.chunk_size) as u64;
let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
VectoredRead {
start,
end,
@@ -328,18 +319,14 @@ pub struct VectoredReadPlanner {
prev: Option<(Key, Lsn, u64, BlobFlag)>,
max_read_size: usize,
align: usize,
}
impl VectoredReadPlanner {
pub fn new(max_read_size: usize) -> Self {
let align = virtual_file::get_io_buffer_alignment();
Self {
blobs: BTreeMap::new(),
prev: None,
max_read_size,
align,
}
}
@@ -418,7 +405,6 @@ impl VectoredReadPlanner {
end_offset,
BlobMeta { key, lsn },
self.max_read_size,
self.align,
);
let prev_read_builder = current_read_builder.replace(next_read_builder);
@@ -472,13 +458,13 @@ impl<'a> VectoredBlobReader<'a> {
);
if cfg!(debug_assertions) {
let align = virtual_file::get_io_buffer_alignment() as u64;
const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
debug_assert_eq!(
read.start % align,
read.start % ALIGN,
0,
"Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
read.start,
align
ALIGN
);
}
@@ -553,22 +539,18 @@ pub struct StreamingVectoredReadPlanner {
max_cnt: usize,
/// Size of the current batch
cnt: usize,
align: usize,
}
impl StreamingVectoredReadPlanner {
pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
assert!(max_cnt > 0);
assert!(max_read_size > 0);
let align = virtual_file::get_io_buffer_alignment();
Self {
read_builder: None,
prev: None,
max_cnt,
max_read_size,
cnt: 0,
align,
}
}
@@ -621,7 +603,6 @@ impl StreamingVectoredReadPlanner {
start_offset,
end_offset,
BlobMeta { key, lsn },
self.align,
))
};
}
@@ -656,9 +637,9 @@ mod tests {
use super::*;
fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
let align = virtual_file::get_io_buffer_alignment() as u64;
assert_eq!(read.start % align, 0);
assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
assert_eq!(read.start % ALIGN, 0);
assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
@@ -676,32 +657,27 @@ mod tests {
fn planner_chunked_coalesce_all_test() {
use crate::virtual_file;
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
// The test explicitly does not check chunk size < 512
if chunk_size < 512 {
return;
}
let max_read_size = chunk_size as usize * 8;
let max_read_size = CHUNK_SIZE as usize * 8;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = [
(key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
(key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size / 2, BlobFlag::None),
(key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size, BlobFlag::None),
(key, lsn, chunk_size * 2 - 1, BlobFlag::None),
(key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size * 3 + 1, BlobFlag::None),
(key, lsn, chunk_size * 5 + 1, BlobFlag::None),
(key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
(key, lsn, chunk_size * 7 + 1, BlobFlag::None),
(key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
(key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
(key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
(key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
(key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
(key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
(key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
(key, lsn, CHUNK_SIZE, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
(key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
(key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
(key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
(key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
];
let ranges = [
@@ -780,19 +756,19 @@ mod tests {
#[test]
fn planner_replacement_test() {
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
let max_read_size = 128 * chunk_size as usize;
const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
let max_read_size = 128 * CHUNK_SIZE as usize;
let first_key = Key::MIN;
let second_key = first_key.next();
let lsn = Lsn(0);
let blob_descriptions = vec![
(first_key, lsn, 0, BlobFlag::None), // First in read 1
(first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
(second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
(second_key, lsn, 3 * chunk_size, BlobFlag::None),
(second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
(second_key, lsn, 5 * chunk_size, BlobFlag::None), // Last in read 2
(first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
(second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
(second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
(second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
(second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
];
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
@@ -802,7 +778,7 @@ mod tests {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(6 * chunk_size);
planner.handle_range_end(6 * CHUNK_SIZE);
let reads = planner.finish();
assert_eq!(reads.len(), 2);
@@ -947,7 +923,6 @@ mod tests {
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);
let align = virtual_file::get_io_buffer_alignment();
let vectored_blob_reader = VectoredBlobReader::new(&file);
let meta = BlobMeta {
key: Key::MIN,
@@ -959,8 +934,7 @@ mod tests {
if idx + 1 == offsets.len() {
continue;
}
let read_builder =
ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, align);
let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
let read = read_builder.build();
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
assert_eq!(result.blobs.len(), 1);

View File

@@ -23,10 +23,12 @@ use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
@@ -38,10 +40,11 @@ pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use api::DirectIoMode;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
pub(crate) mod dio;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
@@ -53,6 +56,7 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod io_buf_aligned;
pub(crate) mod io_buf_ext;
pub(crate) mod slice;
pub(crate) mod write;
@@ -61,6 +65,176 @@ pub(crate) mod owned_buffers_io {
}
}
#[derive(Debug)]
pub enum VirtualFile {
Buffered(VirtualFileInner),
Direct(VirtualFileInner),
}
impl VirtualFile {
fn inner(&self) -> &VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn inner_mut(&mut self) -> &mut VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn into_inner(self) -> VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open(path, ctx).await?;
Ok(Self::Buffered(file))
}
/// Open a file in read-only mode. Like File::open.
///
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
pub async fn open_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::create(path, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn create_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
VirtualFile::open_with_options_v2(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &mut OpenOptions, // Uses `&mut` here to add `O_DIRECT`.
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = match get_io_mode() {
IoMode::Buffered => {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Self::Buffered(file)
}
#[cfg(target_os = "linux")]
IoMode::Direct => {
let file = VirtualFileInner::open_with_options(
path,
open_options.custom_flags(nix::libc::O_DIRECT),
ctx,
)
.await?;
Self::Direct(file)
}
};
Ok(file)
}
pub fn path(&self) -> &Utf8Path {
self.inner().path.as_path()
}
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.inner().sync_all().await
}
pub async fn sync_data(&self) -> Result<(), Error> {
self.inner().sync_data().await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner().metadata().await
}
pub fn remove(self) {
self.into_inner().remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner_mut().seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
where
Buf: IoBufMut + Send,
{
self.inner().read_exact_at(slice, offset, ctx).await
}
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
self.inner().read_exact_at_page(page, offset, ctx).await
}
pub async fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
self.inner().write_all_at(buf, offset, ctx).await
}
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner_mut().write_all(buf, ctx).await
}
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -77,7 +251,7 @@ pub(crate) mod owned_buffers_io {
/// 'tag' field is used to detect whether the handle still is valid or not.
///
#[derive(Debug)]
pub struct VirtualFile {
pub struct VirtualFileInner {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
/// belongs to a different VirtualFile.
@@ -350,12 +524,12 @@ macro_rules! with_file {
}};
}
impl VirtualFile {
impl VirtualFileInner {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
@@ -364,7 +538,7 @@ impl VirtualFile {
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
@@ -382,7 +556,7 @@ impl VirtualFile {
path: P,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -423,7 +597,7 @@ impl VirtualFile {
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path_ref.to_path_buf(),
@@ -1034,6 +1208,21 @@ impl tokio_epoll_uring::IoFd for FileGuard {
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
self.inner().read_blk(blknum, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner_mut().read_to_end(buf, ctx).await
}
}
#[cfg(test)]
impl VirtualFileInner {
pub(crate) async fn read_blk(
&self,
blknum: u32,
@@ -1067,7 +1256,7 @@ impl VirtualFile {
}
}
impl Drop for VirtualFile {
impl Drop for VirtualFileInner {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();
@@ -1143,15 +1332,10 @@ impl OpenFiles {
/// server startup.
///
#[cfg(not(test))]
pub fn init(num_slots: usize, engine: IoEngineKind, io_buffer_alignment: usize) {
pub fn init(num_slots: usize, engine: IoEngineKind) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
if set_io_buffer_alignment(io_buffer_alignment).is_err() {
panic!(
"IO buffer alignment needs to be a power of two and greater than 512, got {io_buffer_alignment}"
);
}
io_engine::init(engine);
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
@@ -1175,47 +1359,22 @@ fn get_open_files() -> &'static OpenFiles {
}
}
static IO_BUFFER_ALIGNMENT: AtomicUsize = AtomicUsize::new(DEFAULT_IO_BUFFER_ALIGNMENT);
/// Returns true if the alignment is a power of two and is greater or equal to 512.
fn is_valid_io_buffer_alignment(align: usize) -> bool {
align.is_power_of_two() && align >= 512
}
/// Sets IO buffer alignment requirement. Returns error if the alignment requirement is
/// not a power of two or less than 512 bytes.
#[allow(unused)]
pub(crate) fn set_io_buffer_alignment(align: usize) -> Result<(), usize> {
if is_valid_io_buffer_alignment(align) {
IO_BUFFER_ALIGNMENT.store(align, std::sync::atomic::Ordering::Relaxed);
Ok(())
} else {
Err(align)
}
}
/// Gets the io buffer alignment.
///
/// This function should be used for getting the actual alignment value to use.
pub(crate) fn get_io_buffer_alignment() -> usize {
let align = IO_BUFFER_ALIGNMENT.load(std::sync::atomic::Ordering::Relaxed);
if cfg!(test) {
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT";
if let Some(test_align) = utils::env::var(env_var_name) {
if is_valid_io_buffer_alignment(test_align) {
test_align
} else {
panic!("IO buffer alignment needs to be a power of two and greater than 512, got {test_align}");
}
} else {
align
}
} else {
align
}
pub(crate) const fn get_io_buffer_alignment() -> usize {
DEFAULT_IO_BUFFER_ALIGNMENT
}
pub(crate) type IoBufferMut = dio::AlignedBufferMut<{ get_io_buffer_alignment() }>;
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn get_io_mode() -> IoMode {
IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
}
#[cfg(test)]
mod tests {
use crate::context::DownloadBehavior;
@@ -1524,7 +1683,7 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
@@ -1576,7 +1735,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1585,7 +1744,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1608,7 +1767,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

View File

@@ -0,0 +1,405 @@
#![allow(unused)]
use core::slice;
use std::{
alloc::{self, Layout},
cmp,
mem::{ManuallyDrop, MaybeUninit},
ops::{Deref, DerefMut},
ptr::{addr_of_mut, NonNull},
};
use bytes::buf::UninitSlice;
struct IoBufferPtr(*mut u8);
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
unsafe impl Send for IoBufferPtr {}
/// An aligned buffer type used for I/O.
pub struct AlignedBufferMut<const ALIGN: usize> {
ptr: IoBufferPtr,
capacity: usize,
len: usize,
}
impl<const ALIGN: usize> AlignedBufferMut<ALIGN> {
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
///
/// The buffer will be able to hold at most `capacity` elements and will never resize.
///
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
/// * `align` must not be zero,
///
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity(capacity: usize) -> Self {
let layout = Layout::from_size_align(capacity, ALIGN).expect("Invalid layout");
// SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout.
let ptr = unsafe {
let ptr = alloc::alloc(layout);
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
IoBufferPtr(ptr)
};
AlignedBufferMut {
ptr,
capacity,
len: 0,
}
}
/// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
pub fn with_capacity_zeroed(capacity: usize) -> Self {
use bytes::BufMut;
let mut buf = Self::with_capacity(capacity);
buf.put_bytes(0, capacity);
buf.len = capacity;
buf
}
/// Returns the total number of bytes the buffer can hold.
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
/// Returns the alignment of the buffer.
#[inline]
pub const fn align(&self) -> usize {
ALIGN
}
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
#[inline]
pub fn len(&self) -> usize {
self.len
}
/// Force the length of the buffer to `new_len`.
#[inline]
unsafe fn set_len(&mut self, new_len: usize) {
debug_assert!(new_len <= self.capacity());
self.len = new_len;
}
#[inline]
fn as_ptr(&self) -> *const u8 {
self.ptr.0
}
#[inline]
fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr.0
}
/// Extracts a slice containing the entire buffer.
///
/// Equivalent to `&s[..]`.
#[inline]
fn as_slice(&self) -> &[u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts(self.as_ptr(), self.len) }
}
/// Extracts a mutable slice of the entire buffer.
///
/// Equivalent to `&mut s[..]`.
fn as_mut_slice(&mut self) -> &mut [u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
}
/// Drops the all the contents of the buffer, setting its length to `0`.
#[inline]
pub fn clear(&mut self) {
self.len = 0;
}
/// Reserves capacity for at least `additional` more bytes to be inserted
/// in the given `IoBufferMut`. The collection may reserve more space to
/// speculatively avoid frequent reallocations. After calling `reserve`,
/// capacity will be greater than or equal to `self.len() + additional`.
/// Does nothing if capacity is already sufficient.
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
pub fn reserve(&mut self, additional: usize) {
if additional > self.capacity() - self.len() {
self.reserve_inner(additional);
}
}
fn reserve_inner(&mut self, additional: usize) {
let Some(required_cap) = self.len().checked_add(additional) else {
capacity_overflow()
};
let old_capacity = self.capacity();
let align = self.align();
// This guarantees exponential growth. The doubling cannot overflow
// because `cap <= isize::MAX` and the type of `cap` is `usize`.
let cap = cmp::max(old_capacity * 2, required_cap);
if !is_valid_alloc(cap) {
capacity_overflow()
}
let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout");
let old_ptr = self.as_mut_ptr();
// SAFETY: old allocation was allocated with std::alloc::alloc with the same layout,
// and we panics on null pointer.
let (ptr, cap) = unsafe {
let old_layout = Layout::from_size_align_unchecked(old_capacity, align);
let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size());
if ptr.is_null() {
alloc::handle_alloc_error(new_layout);
}
(IoBufferPtr(ptr), cap)
};
self.ptr = ptr;
self.capacity = cap;
}
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
pub fn leak<'a>(self) -> &'a mut [u8] {
let mut buf = ManuallyDrop::new(self);
// SAFETY: leaking the buffer as intended.
unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) }
}
}
fn capacity_overflow() -> ! {
panic!("capacity overflow")
}
// We need to guarantee the following:
// * We don't ever allocate `> isize::MAX` byte-size objects.
// * We don't overflow `usize::MAX` and actually allocate too little.
//
// On 64-bit we just need to check for overflow since trying to allocate
// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add
// an extra guard for this in case we're running on a platform which can use
// all 4GB in user-space, e.g., PAE or x32.
#[inline]
fn is_valid_alloc(alloc_size: usize) -> bool {
!(usize::BITS < 64 && alloc_size > isize::MAX as usize)
}
impl<const ALIGN: usize> Drop for AlignedBufferMut<ALIGN> {
fn drop(&mut self) {
// SAFETY: memory was allocated with std::alloc::alloc with the same layout.
unsafe {
alloc::dealloc(
self.as_mut_ptr(),
Layout::from_size_align_unchecked(self.capacity, ALIGN),
)
}
}
}
impl<const ALIGN: usize> Deref for AlignedBufferMut<ALIGN> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl<const ALIGN: usize> DerefMut for AlignedBufferMut<ALIGN> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
unsafe impl<const ALIGN: usize> bytes::BufMut for AlignedBufferMut<ALIGN> {
#[inline]
fn remaining_mut(&self) -> usize {
// Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
// Thus, it can have at most `self.capacity` bytes.
self.capacity() - self.len()
}
// SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
let len = self.len();
let remaining = self.remaining_mut();
if remaining < cnt {
panic_advance(cnt, remaining);
}
// Addition will not overflow since the sum is at most the capacity.
self.set_len(len + cnt);
}
#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
let cap = self.capacity();
let len = self.len();
// SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
// valid for `cap - len` bytes. The subtraction will not underflow since
// `len <= cap`.
unsafe { UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len) }
}
}
/// Panic with a nice error message.
#[cold]
fn panic_advance(idx: usize, len: usize) -> ! {
panic!(
"advance out of bounds: the len is {} but advancing by {}",
len, idx
);
}
/// Safety: [`IoBufferMut`] has exclusive ownership of the io buffer,
/// and the location remains stable even if [`Self`] is moved.
unsafe impl<const ALIGN: usize> tokio_epoll_uring::IoBuf for AlignedBufferMut<ALIGN> {
fn stable_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn bytes_init(&self) -> usize {
self.len()
}
fn bytes_total(&self) -> usize {
self.capacity()
}
}
// SAFETY: See above.
unsafe impl<const ALIGN: usize> tokio_epoll_uring::IoBufMut for AlignedBufferMut<ALIGN> {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
}
unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.set_len(init_len);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const ALIGN: usize = 4 * 1024;
type TestIoBufferMut = AlignedBufferMut<ALIGN>;
#[test]
fn test_with_capacity() {
let v = TestIoBufferMut::with_capacity(ALIGN * 4);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
let v = TestIoBufferMut::with_capacity(ALIGN / 2);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN / 2);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
fn test_with_capacity_zeroed() {
let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
assert_eq!(&v[..], &[0; ALIGN])
}
#[test]
fn test_reserve() {
use bytes::BufMut;
let mut v = TestIoBufferMut::with_capacity(ALIGN);
let capacity = v.capacity();
v.reserve(capacity);
assert_eq!(v.capacity(), capacity);
let data = [b'a'; ALIGN];
v.put(&data[..]);
v.reserve(capacity);
assert!(v.capacity() >= capacity * 2);
assert_eq!(&v[..], &data[..]);
let capacity = v.capacity();
v.clear();
v.reserve(capacity);
assert_eq!(capacity, v.capacity());
}
#[test]
fn test_bytes_put() {
use bytes::BufMut;
let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
let x = [b'a'; ALIGN];
for _ in 0..2 {
for _ in 0..4 {
v.put(&x[..]);
}
assert_eq!(v.len(), ALIGN * 4);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
#[should_panic]
fn test_bytes_put_panic() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
let x = [b'a'; ALIGN];
for _ in 0..5 {
v.put_slice(&x[..]);
}
}
#[test]
fn test_io_buf_put_slice() {
use tokio_epoll_uring::BoundedBufMut;
const ALIGN: usize = 4 * 1024;
let mut v = TestIoBufferMut::with_capacity(ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
v.put_slice(&x[..]);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
}

View File

@@ -0,0 +1,9 @@
#![allow(unused)]
use tokio_epoll_uring::IoBufMut;
use crate::virtual_file::IoBufferMut;
pub(crate) trait IoBufAlignedMut: IoBufMut {}
impl IoBufAlignedMut for IoBufferMut {}

View File

@@ -1,5 +1,6 @@
//! See [`FullSlice`].
use crate::virtual_file::IoBufferMut;
use bytes::{Bytes, BytesMut};
use std::ops::{Deref, Range};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
@@ -76,3 +77,4 @@ macro_rules! impl_io_buf_ext {
impl_io_buf_ext!(Bytes);
impl_io_buf_ext!(BytesMut);
impl_io_buf_ext!(Vec<u8>);
impl_io_buf_ext!(IoBufferMut);

View File

@@ -398,7 +398,7 @@ class NeonEnvBuilder:
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None,
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
pageserver_virtual_file_io_mode: Optional[str] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -452,7 +452,7 @@ class NeonEnvBuilder:
self.storage_controller_port_override = storage_controller_port_override
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
assert test_name.startswith(
"test_"
@@ -1041,7 +1041,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
# Create the neon_local's `NeonLocalInitConf`
cfg: Dict[str, Any] = {
@@ -1105,7 +1105,8 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
# Create a corresponding NeonPageserver object
self.pageservers.append(
@@ -1411,7 +1412,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
@@ -1437,7 +1438,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
env = builder.init_start()
@@ -1461,7 +1462,7 @@ def neon_env_builder(
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_aux_file_policy: Optional[AuxFileStore],
record_property: Callable[[str, object], None],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1496,7 +1497,7 @@ def neon_env_builder(
test_overlay_dir=test_overlay_dir,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,

View File

@@ -35,8 +35,8 @@ def pageserver_virtual_file_io_engine() -> Optional[str]:
@pytest.fixture(scope="function", autouse=True)
def pageserver_io_buffer_alignment() -> Optional[int]:
return None
def pageserver_virtual_file_io_mode() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
@pytest.fixture(scope="function", autouse=True)