mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
use tokio-epoll-uring impl
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -7199,7 +7199,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=devin%2F1746710596-expose-fallocate-modes#8bd6d0e99d4937096acfe40b64fd63aa9ad2e2ea"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"nix 0.26.4",
|
||||
@@ -7810,7 +7810,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=devin%2F1746710596-expose-fallocate-modes#8bd6d0e99d4937096acfe40b64fd63aa9ad2e2ea"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"io-uring",
|
||||
|
||||
@@ -187,7 +187,7 @@ thiserror = "1.0"
|
||||
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tokio = { version = "1.43.1", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "devin/1746710596-expose-fallocate-modes" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.12.0"
|
||||
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
|
||||
|
||||
@@ -105,13 +105,13 @@ pub async fn download_layer_file<'a>(
|
||||
.map_err(DownloadError::Other)?,
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
);
|
||||
if let Ok(file_size) = TryInto::<i64>::try_into(layer_metadata.file_size.next_multiple_of(
|
||||
64 * 1024 /* TODO this is the max roundtup size by the buffered writer set_len_then_truncate */
|
||||
|
||||
)) {
|
||||
{
|
||||
temp_file.fallocate(
|
||||
0,
|
||||
file_size,
|
||||
layer_metadata.file_size.next_multiple_of(
|
||||
64 * 1024 /* TODO this is the max roundtup size by the buffered writer set_len_then_truncate */
|
||||
|
||||
),
|
||||
ctx,
|
||||
).await.unwrap();
|
||||
};
|
||||
|
||||
@@ -156,8 +156,8 @@ impl VirtualFile {
|
||||
|
||||
pub async fn fallocate(
|
||||
&self,
|
||||
offset: i64,
|
||||
size: i64,
|
||||
offset: u64,
|
||||
size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), Error> {
|
||||
self.inner.fallocate(offset, size, ctx).await
|
||||
@@ -643,8 +643,8 @@ impl VirtualFileInner {
|
||||
|
||||
pub async fn fallocate(
|
||||
&self,
|
||||
offset: i64,
|
||||
size: i64,
|
||||
offset: u64,
|
||||
size: u64,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::Fallocate, |file_guard| {
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(super) mod tokio_epoll_uring_ext;
|
||||
use nix::fcntl::{FallocateFlags, fallocate};
|
||||
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tracing::Instrument;
|
||||
@@ -242,7 +241,7 @@ impl IoEngine {
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
// TODO: ftruncate op for tokio-epoll-uring
|
||||
// TODO: ftruncate op for tokio-epoll-uring: https://github.com/neondatabase/neon/issues/11817
|
||||
// Don't forget to use retry_ecanceled_once
|
||||
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
||||
(file_guard, res)
|
||||
@@ -253,34 +252,44 @@ impl IoEngine {
|
||||
pub(super) async fn fallocate(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
offset: i64,
|
||||
len: i64,
|
||||
offset: u64,
|
||||
len: u64,
|
||||
) -> (FileGuard, std::io::Result<()>) {
|
||||
// TODO io_uring implementation
|
||||
// NB: if you ever think of using FALLOC_FL_KEEP_SIZE, keep
|
||||
// in mind that I have found it to be punting to io_uring worker threads
|
||||
// on Debian Bookworm Linux 6.1.0-32-amd64 and 6.12.25 mainline.
|
||||
// => https://gist.github.com/problame/ed876bea40b915ba53267b8265e99352
|
||||
match self {
|
||||
IoEngine::NotSet => panic!("not initialized"),
|
||||
IoEngine::StdFs => {
|
||||
unimplemented!()
|
||||
let flags = nix::fcntl::FallocateFlags::empty();
|
||||
let Ok(offset) = nix::libc::off_t::try_from(offset) else {
|
||||
return (
|
||||
file_guard,
|
||||
Err(std::io::Error::from_raw_os_error(nix::libc::EINVAL)),
|
||||
);
|
||||
};
|
||||
let Ok(len) = nix::libc::off_t::try_from(len) else {
|
||||
return (
|
||||
file_guard,
|
||||
Err(std::io::Error::from_raw_os_error(nix::libc::EINVAL)),
|
||||
);
|
||||
};
|
||||
let res = file_guard.with_std_file(|std_file| {
|
||||
nix::fcntl::fallocate(std_file.as_raw_fd(), flags, offset, len)
|
||||
});
|
||||
let res = res.map_err(|e: nix::errno::Errno| e.into());
|
||||
(file_guard, res)
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
// TODO: fallocate op for tokio-epoll-uring
|
||||
file_guard.with_std_file(|std_file| {
|
||||
fallocate(
|
||||
std_file.as_raw_fd(),
|
||||
// NB: if you ever think of using FALLOC_FL_KEEP_SIZE, keep
|
||||
// in mind that I have found it to be punting to io_uring worker threads
|
||||
// on Debian Bookworm Linux 6.1.0-32-amd64 and 6.12.25 mainline.
|
||||
// => https://gist.github.com/problame/ed876bea40b915ba53267b8265e99352
|
||||
FallocateFlags::empty(),
|
||||
offset,
|
||||
len,
|
||||
)
|
||||
.expect("TODO");
|
||||
std_file.sync_all().unwrap();
|
||||
()
|
||||
});
|
||||
(file_guard, Ok(()))
|
||||
let flags = tokio_epoll_uring::FallocateFlags::empty();
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (file_guard, res) = retry_ecanceled_once(file_guard, async |file_guard| {
|
||||
system.fallocate(file_guard, flags, offset, len).await
|
||||
})
|
||||
.await;
|
||||
(file_guard, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user