From 80a07f631964b0d98b57ea4da9e39ce418f26c71 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 28 Apr 2025 15:17:19 +0200 Subject: [PATCH] WIP: fallocate files before writing --- pageserver/src/metrics.rs | 2 ++ pageserver/src/tenant/ephemeral_file.rs | 4 +++ .../tenant/remote_timeline_client/download.rs | 13 +++++++- .../src/tenant/storage_layer/delta_layer.rs | 4 +++ .../src/tenant/storage_layer/image_layer.rs | 4 +++ pageserver/src/virtual_file.rs | 23 ++++++++++++++ pageserver/src/virtual_file/io_engine.rs | 31 +++++++++++++++++++ 7 files changed, 80 insertions(+), 1 deletion(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9a6c3f2378..15a60effcc 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1290,6 +1290,7 @@ pub(crate) enum StorageIoOperation { Fsync, Metadata, SetLen, + Fallocate, } impl StorageIoOperation { @@ -1305,6 +1306,7 @@ impl StorageIoOperation { StorageIoOperation::Fsync => "fsync", StorageIoOperation::Metadata => "metadata", StorageIoOperation::SetLen => "set_len", + StorageIoOperation::Fallocate => "fallocate", } } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 2edf22e9fd..3bbaeb8946 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -88,6 +88,10 @@ impl EphemeralFile { gate.enter()?, ); + file.fallocate_keep_size(0, 1 * 1024 * 1024 * 1024, ctx) + .await + .unwrap(); + let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore Ok(EphemeralFile { diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 84989e0fb8..0cee73afba 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -76,6 +76,8 @@ pub async fn download_layer_file<'a>( layer_metadata.generation, ); + let expected = layer_metadata.file_size; + let (bytes_amount, temp_file) = download_retry( || async { // TempVirtualFile requires us to never reuse a filename while an old @@ -103,6 +105,16 @@ pub async fn download_layer_file<'a>( .map_err(DownloadError::Other)?, gate.enter().map_err(|_| DownloadError::Cancelled)?, ); + if let Ok(file_size) = TryInto::::try_into(layer_metadata.file_size.next_multiple_of( +64 * 1024 /* TODO this is the max roundtup size by the buffered writer set_len_then_truncate */ + + )) { + temp_file.fallocate_keep_size( + 0, + file_size, + ctx, + ).await.unwrap(); + }; download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await }, &format!("download {remote_path:?}"), @@ -110,7 +122,6 @@ pub async fn download_layer_file<'a>( ) .await?; - let expected = layer_metadata.file_size; if expected != bytes_amount { return Err(DownloadError::Other(anyhow!( "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}", diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 607b0d513c..79bb7b3092 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -440,6 +440,10 @@ impl DeltaLayerWriterInner { gate.enter()?, ); + file.fallocate_keep_size(0, 1 * 1024 * 1024 * 1024, ctx) + .await + .unwrap(); + // Start at PAGE_SZ, make room for the header block let blob_writer = BlobWriter::new( file, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2f7c5715bb..9e8e923414 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -806,6 +806,10 @@ impl ImageLayerWriterInner { gate.enter()?, ); + file.fallocate_keep_size(0, 1 * 1024 * 1024 * 1024, ctx) + .await + .unwrap(); + // Start at `PAGE_SZ` to make room for the header block. let blob_writer = BlobWriter::new( file, diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 58953407b1..7df045310d 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -161,6 +161,15 @@ impl VirtualFile { self.inner.set_len(len, ctx).await } + pub async fn fallocate_keep_size( + &self, + offset: i64, + size: i64, + ctx: &RequestContext, + ) -> Result<(), Error> { + self.inner.fallocate_keep_size(offset, size, ctx).await + } + pub async fn metadata(&self) -> Result { self.inner.metadata().await } @@ -638,6 +647,20 @@ impl VirtualFileInner { }) } + pub async fn fallocate_keep_size( + &self, + offset: i64, + size: i64, + _ctx: &RequestContext, + ) -> Result<(), Error> { + with_file!(self, StorageIoOperation::Fallocate, |file_guard| { + let (_file_guard, res) = io_engine::get() + .fallocate_keep_size(file_guard, offset, size) + .await; + res.maybe_fatal_err("fallocate") // TODO haven't thought about this + }) + } + /// Helper function internal to `VirtualFile` that looks up the underlying File, /// opens it and evicts some other File if necessary. The passed parameter is /// assumed to be a function available for the physical `File`. diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index dd04fb561a..7128f4b59a 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -11,6 +11,7 @@ #[cfg(target_os = "linux")] pub(super) mod tokio_epoll_uring_ext; +use nix::fcntl::{FallocateFlags, fallocate}; use tokio_epoll_uring::IoBuf; use tracing::Instrument; @@ -109,6 +110,7 @@ pub(crate) fn get() -> IoEngine { } } +use std::os::fd::AsRawFd; use std::os::unix::prelude::FileExt; use std::sync::atomic::{AtomicU8, Ordering}; @@ -230,6 +232,35 @@ impl IoEngine { } } + pub(super) async fn fallocate_keep_size( + &self, + file_guard: FileGuard, + offset: i64, + len: i64, + ) -> (FileGuard, std::io::Result<()>) { + // TODO io_uring implementation + match self { + IoEngine::NotSet => panic!("not initialized"), + IoEngine::StdFs => { + unimplemented!() + } + #[cfg(target_os = "linux")] + IoEngine::TokioEpollUring => { + // TODO: fallocate op for tokio-epoll-uring + file_guard.with_std_file(|std_file| { + fallocate( + std_file.as_raw_fd(), + FallocateFlags::FALLOC_FL_KEEP_SIZE, + offset, + len, + ) + .expect("TODO") + }); + (file_guard, Ok(())) + } + } + } + pub(super) async fn write_at( &self, file_guard: FileGuard,