Compare commits

...

4 Commits

Author SHA1 Message Date
Christian Schwarz
5d6254cbb7 use tokio-epoll-uring impl 2025-05-09 10:58:00 +02:00
Christian Schwarz
8c809e3586 Merge remote-tracking branch 'origin/main' into problame/fallocate 2025-05-09 10:37:13 +02:00
Christian Schwarz
6146a82771 incorporate findings from research 2025-04-29 13:27:39 +02:00
Christian Schwarz
80a07f6319 WIP: fallocate files before writing 2025-04-28 15:18:17 +02:00
9 changed files with 97 additions and 5 deletions

4
Cargo.lock generated
View File

@@ -7199,7 +7199,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=devin%2F1746710596-expose-fallocate-modes#8bd6d0e99d4937096acfe40b64fd63aa9ad2e2ea"
dependencies = [
"futures",
"nix 0.26.4",
@@ -7810,7 +7810,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=devin%2F1746710596-expose-fallocate-modes#8bd6d0e99d4937096acfe40b64fd63aa9ad2e2ea"
dependencies = [
"bytes",
"io-uring",

View File

@@ -187,7 +187,7 @@ thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "devin/1746710596-expose-fallocate-modes" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}

View File

@@ -1308,6 +1308,7 @@ pub(crate) enum StorageIoOperation {
Fsync,
Metadata,
SetLen,
Fallocate,
}
impl StorageIoOperation {
@@ -1323,6 +1324,7 @@ impl StorageIoOperation {
StorageIoOperation::Fsync => "fsync",
StorageIoOperation::Metadata => "metadata",
StorageIoOperation::SetLen => "set_len",
StorageIoOperation::Fallocate => "fallocate",
}
}
}

View File

@@ -88,6 +88,10 @@ impl EphemeralFile {
gate.enter()?,
);
file.fallocate(0, 1 * 1024 * 1024 * 1024, ctx)
.await
.unwrap();
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
Ok(EphemeralFile {

View File

@@ -76,6 +76,8 @@ pub async fn download_layer_file<'a>(
layer_metadata.generation,
);
let expected = layer_metadata.file_size;
let (bytes_amount, temp_file) = download_retry(
|| async {
// TempVirtualFile requires us to never reuse a filename while an old
@@ -103,6 +105,16 @@ pub async fn download_layer_file<'a>(
.map_err(DownloadError::Other)?,
gate.enter().map_err(|_| DownloadError::Cancelled)?,
);
{
temp_file.fallocate(
0,
layer_metadata.file_size.next_multiple_of(
64 * 1024 /* TODO this is the max roundtup size by the buffered writer set_len_then_truncate */
),
ctx,
).await.unwrap();
};
download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
},
&format!("download {remote_path:?}"),
@@ -110,7 +122,6 @@ pub async fn download_layer_file<'a>(
)
.await?;
let expected = layer_metadata.file_size;
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",

View File

@@ -441,6 +441,10 @@ impl DeltaLayerWriterInner {
gate.enter()?,
);
file.fallocate(0, 1 * 1024 * 1024 * 1024, ctx)
.await
.unwrap();
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(
file,

View File

@@ -799,6 +799,10 @@ impl ImageLayerWriterInner {
gate.enter()?,
);
file.fallocate(0, 1 * 1024 * 1024 * 1024, ctx)
.await
.unwrap();
// Start at `PAGE_SZ` to make room for the header block.
let blob_writer = BlobWriter::new(
file,

View File

@@ -154,6 +154,15 @@ impl VirtualFile {
self.inner.set_len(len, ctx).await
}
pub async fn fallocate(
&self,
offset: u64,
size: u64,
ctx: &RequestContext,
) -> Result<(), Error> {
self.inner.fallocate(offset, size, ctx).await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner.metadata().await
}
@@ -632,6 +641,18 @@ impl VirtualFileInner {
})
}
pub async fn fallocate(
&self,
offset: u64,
size: u64,
_ctx: &RequestContext,
) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fallocate, |file_guard| {
let (_file_guard, res) = io_engine::get().fallocate(file_guard, offset, size).await;
res.maybe_fatal_err("fallocate") // TODO haven't thought about this
})
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.

View File

@@ -109,6 +109,7 @@ pub(crate) fn get() -> IoEngine {
}
}
use std::os::fd::AsRawFd;
use std::os::unix::prelude::FileExt;
use std::sync::atomic::{AtomicU8, Ordering};
#[cfg(target_os = "linux")]
@@ -240,7 +241,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
// TODO: ftruncate op for tokio-epoll-uring
// TODO: ftruncate op for tokio-epoll-uring: https://github.com/neondatabase/neon/issues/11817
// Don't forget to use retry_ecanceled_once
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
@@ -248,6 +249,51 @@ impl IoEngine {
}
}
pub(super) async fn fallocate(
&self,
file_guard: FileGuard,
offset: u64,
len: u64,
) -> (FileGuard, std::io::Result<()>) {
// NB: if you ever think of using FALLOC_FL_KEEP_SIZE, keep
// in mind that I have found it to be punting to io_uring worker threads
// on Debian Bookworm Linux 6.1.0-32-amd64 and 6.12.25 mainline.
// => https://gist.github.com/problame/ed876bea40b915ba53267b8265e99352
match self {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
let flags = nix::fcntl::FallocateFlags::empty();
let Ok(offset) = nix::libc::off_t::try_from(offset) else {
return (
file_guard,
Err(std::io::Error::from_raw_os_error(nix::libc::EINVAL)),
);
};
let Ok(len) = nix::libc::off_t::try_from(len) else {
return (
file_guard,
Err(std::io::Error::from_raw_os_error(nix::libc::EINVAL)),
);
};
let res = file_guard.with_std_file(|std_file| {
nix::fcntl::fallocate(std_file.as_raw_fd(), flags, offset, len)
});
let res = res.map_err(|e: nix::errno::Errno| e.into());
(file_guard, res)
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let flags = tokio_epoll_uring::FallocateFlags::empty();
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (file_guard, res) = retry_ecanceled_once(file_guard, async |file_guard| {
system.fallocate(file_guard, flags, offset, len).await
})
.await;
(file_guard, res.map_err(epoll_uring_error_to_std))
}
}
}
pub(super) async fn write_at<B: IoBuf + Send>(
&self,
file_guard: FileGuard,