diff --git a/Cargo.lock b/Cargo.lock index a9b54ffcaf..655542c12d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1360,7 +1360,6 @@ dependencies = [ "prost 0.13.3", "thiserror 1.0.69", "tokio", - "tokio-epoll-uring", "tokio-pipe", "tonic", "tracing", diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index b34c3843e2..fba4a5067c 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -25,8 +25,7 @@ zerocopy = "0.8.0" zerocopy-derive = "0.8.0" metrics.workspace = true -tokio-epoll-uring.workspace = true -uring-common.workspace = true +uring-common = { workspace = true, features = ["bytes"] } pageserver_client_grpc.workspace = true pageserver_page_api.workspace = true diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index cc85cc2f57..ee3964e283 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -9,19 +9,18 @@ //! process. The backend processes *also* read the file (and sometimes also //! write it? ), but the backends use direct C library calls for that. use std::fs::File; +use std::os::unix::fs::FileExt; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; -use tokio_epoll_uring; - use crate::BLCKSZ; +use tokio::task::spawn_blocking; + pub type CacheBlock = u64; pub struct FileCache { - uring_system: tokio_epoll_uring::SystemHandle, - file: Arc, free_list: Mutex, @@ -43,7 +42,6 @@ impl FileCache { pub fn new( file_cache_path: &Path, mut initial_size: u64, - uring_system: tokio_epoll_uring::SystemHandle, ) -> Result { if initial_size < 100 { tracing::warn!( @@ -75,7 +73,6 @@ impl FileCache { Ok(FileCache { file: Arc::new(file), - uring_system, free_list: Mutex::new(FreeList { next_free_block: 0, max_blocks: initial_size, @@ -91,21 +88,14 @@ impl FileCache { pub async fn read_block( &self, cache_block: CacheBlock, - dst: impl uring_common::buf::IoBufMut + Send + Sync, + mut dst: impl uring_common::buf::IoBufMut + Send + Sync, ) -> Result<(), std::io::Error> { assert!(dst.bytes_total() == BLCKSZ); let file = self.file.clone(); - let ((_file, _buf), res) = self - .uring_system - .read(file, cache_block as u64 * BLCKSZ as u64, dst) - .await; - - let res = res.map_err(map_io_uring_error)?; - if res != BLCKSZ { - panic!("unexpected read result"); - } + let dst_ref = unsafe { std::slice::from_raw_parts_mut(dst.stable_mut_ptr(), BLCKSZ) }; + spawn_blocking(move || file.read_exact_at(dst_ref, cache_block as u64 * BLCKSZ as u64)).await??; Ok(()) } @@ -117,14 +107,9 @@ impl FileCache { assert!(src.bytes_init() == BLCKSZ); let file = self.file.clone(); - let ((_file, _buf), res) = self - .uring_system - .write(file, cache_block as u64 * BLCKSZ as u64, src) - .await; - let res = res.map_err(map_io_uring_error)?; - if res != BLCKSZ { - panic!("unexpected read result"); - } + let src_ref = unsafe { std::slice::from_raw_parts(src.stable_ptr(), BLCKSZ) }; + + spawn_blocking(move || file.write_all_at(src_ref, cache_block as u64 * BLCKSZ as u64)).await??; Ok(()) } @@ -148,15 +133,6 @@ impl FileCache { } } -fn map_io_uring_error(err: tokio_epoll_uring::Error) -> std::io::Error { - match err { - tokio_epoll_uring::Error::Op(err) => err, - tokio_epoll_uring::Error::System(err) => { - std::io::Error::new(std::io::ErrorKind::Other, err) - } - } -} - impl metrics::core::Collector for FileCache { fn desc(&self) -> Vec<&metrics::core::Desc> { let mut descs = Vec::new(); diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 622acc8361..90574da6f3 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -12,7 +12,7 @@ use pageserver_client_grpc::PageserverClient; use pageserver_page_api::model; use tokio::io::AsyncReadExt; -use tokio_epoll_uring::IoBuf; +use uring_common::buf::IoBuf; use tokio_pipe::PipeRead; use super::callbacks::{get_request_lsn, notify_proc}; @@ -44,17 +44,15 @@ pub(super) async fn init( ) -> CommunicatorWorkerProcessStruct<'static> { let last_lsn = get_request_lsn(); - let uring_system = tokio_epoll_uring::System::launch().await.unwrap(); - let file_cache = if let Some(path) = file_cache_path { Some( - FileCache::new(&path, file_cache_size, uring_system) + FileCache::new(&path, file_cache_size) .expect("could not create cache file"), ) } else { // FIXME: temporarily for testing, use LFC even if disabled Some( - FileCache::new(&PathBuf::from("new_filecache"), 1000, uring_system) + FileCache::new(&PathBuf::from("new_filecache"), 1000) .expect("could not create cache file"), ) };