mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 03:20:36 +00:00
Remove dependency on io_uring, use plain std::fs ops instead
io_uring is a great idea in the long term, but for now, let's make it easier to develop locally on macos, where io_uring is not available.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1360,7 +1360,6 @@ dependencies = [
|
||||
"prost 0.13.3",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-pipe",
|
||||
"tonic",
|
||||
"tracing",
|
||||
|
||||
@@ -25,8 +25,7 @@ zerocopy = "0.8.0"
|
||||
zerocopy-derive = "0.8.0"
|
||||
|
||||
metrics.workspace = true
|
||||
tokio-epoll-uring.workspace = true
|
||||
uring-common.workspace = true
|
||||
uring-common = { workspace = true, features = ["bytes"] }
|
||||
|
||||
pageserver_client_grpc.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
|
||||
@@ -9,19 +9,18 @@
|
||||
//! process. The backend processes *also* read the file (and sometimes also
|
||||
//! write it? ), but the backends use direct C library calls for that.
|
||||
use std::fs::File;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use tokio_epoll_uring;
|
||||
|
||||
use crate::BLCKSZ;
|
||||
|
||||
use tokio::task::spawn_blocking;
|
||||
|
||||
pub type CacheBlock = u64;
|
||||
|
||||
pub struct FileCache {
|
||||
uring_system: tokio_epoll_uring::SystemHandle,
|
||||
|
||||
file: Arc<File>,
|
||||
|
||||
free_list: Mutex<FreeList>,
|
||||
@@ -43,7 +42,6 @@ impl FileCache {
|
||||
pub fn new(
|
||||
file_cache_path: &Path,
|
||||
mut initial_size: u64,
|
||||
uring_system: tokio_epoll_uring::SystemHandle,
|
||||
) -> Result<FileCache, std::io::Error> {
|
||||
if initial_size < 100 {
|
||||
tracing::warn!(
|
||||
@@ -75,7 +73,6 @@ impl FileCache {
|
||||
|
||||
Ok(FileCache {
|
||||
file: Arc::new(file),
|
||||
uring_system,
|
||||
free_list: Mutex::new(FreeList {
|
||||
next_free_block: 0,
|
||||
max_blocks: initial_size,
|
||||
@@ -91,21 +88,14 @@ impl FileCache {
|
||||
pub async fn read_block(
|
||||
&self,
|
||||
cache_block: CacheBlock,
|
||||
dst: impl uring_common::buf::IoBufMut + Send + Sync,
|
||||
mut dst: impl uring_common::buf::IoBufMut + Send + Sync,
|
||||
) -> Result<(), std::io::Error> {
|
||||
assert!(dst.bytes_total() == BLCKSZ);
|
||||
let file = self.file.clone();
|
||||
|
||||
let ((_file, _buf), res) = self
|
||||
.uring_system
|
||||
.read(file, cache_block as u64 * BLCKSZ as u64, dst)
|
||||
.await;
|
||||
|
||||
let res = res.map_err(map_io_uring_error)?;
|
||||
if res != BLCKSZ {
|
||||
panic!("unexpected read result");
|
||||
}
|
||||
let dst_ref = unsafe { std::slice::from_raw_parts_mut(dst.stable_mut_ptr(), BLCKSZ) };
|
||||
|
||||
spawn_blocking(move || file.read_exact_at(dst_ref, cache_block as u64 * BLCKSZ as u64)).await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -117,14 +107,9 @@ impl FileCache {
|
||||
assert!(src.bytes_init() == BLCKSZ);
|
||||
let file = self.file.clone();
|
||||
|
||||
let ((_file, _buf), res) = self
|
||||
.uring_system
|
||||
.write(file, cache_block as u64 * BLCKSZ as u64, src)
|
||||
.await;
|
||||
let res = res.map_err(map_io_uring_error)?;
|
||||
if res != BLCKSZ {
|
||||
panic!("unexpected read result");
|
||||
}
|
||||
let src_ref = unsafe { std::slice::from_raw_parts(src.stable_ptr(), BLCKSZ) };
|
||||
|
||||
spawn_blocking(move || file.write_all_at(src_ref, cache_block as u64 * BLCKSZ as u64)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -148,15 +133,6 @@ impl FileCache {
|
||||
}
|
||||
}
|
||||
|
||||
fn map_io_uring_error(err: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
|
||||
match err {
|
||||
tokio_epoll_uring::Error::Op(err) => err,
|
||||
tokio_epoll_uring::Error::System(err) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::core::Collector for FileCache {
|
||||
fn desc(&self) -> Vec<&metrics::core::Desc> {
|
||||
let mut descs = Vec::new();
|
||||
|
||||
@@ -12,7 +12,7 @@ use pageserver_client_grpc::PageserverClient;
|
||||
use pageserver_page_api::model;
|
||||
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use uring_common::buf::IoBuf;
|
||||
use tokio_pipe::PipeRead;
|
||||
|
||||
use super::callbacks::{get_request_lsn, notify_proc};
|
||||
@@ -44,17 +44,15 @@ pub(super) async fn init(
|
||||
) -> CommunicatorWorkerProcessStruct<'static> {
|
||||
let last_lsn = get_request_lsn();
|
||||
|
||||
let uring_system = tokio_epoll_uring::System::launch().await.unwrap();
|
||||
|
||||
let file_cache = if let Some(path) = file_cache_path {
|
||||
Some(
|
||||
FileCache::new(&path, file_cache_size, uring_system)
|
||||
FileCache::new(&path, file_cache_size)
|
||||
.expect("could not create cache file"),
|
||||
)
|
||||
} else {
|
||||
// FIXME: temporarily for testing, use LFC even if disabled
|
||||
Some(
|
||||
FileCache::new(&PathBuf::from("new_filecache"), 1000, uring_system)
|
||||
FileCache::new(&PathBuf::from("new_filecache"), 1000)
|
||||
.expect("could not create cache file"),
|
||||
)
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user