mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
Revert "CP tokio_epoll_uring for read path"
This reverts commit 1556234d9a.
This commit is contained in:
78
Cargo.lock
generated
78
Cargo.lock
generated
@@ -2066,26 +2066,6 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.7.2"
|
||||
@@ -2185,9 +2165,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.147"
|
||||
version = "0.2.144"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
|
||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@@ -2767,7 +2747,6 @@ dependencies = [
|
||||
"tenant_size_model",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-tar",
|
||||
@@ -3888,12 +3867,6 @@ dependencies = [
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scoped-tls"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -4478,18 +4451,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.47"
|
||||
version = "1.0.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
|
||||
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.47"
|
||||
version = "1.0.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
|
||||
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -4574,39 +4547,22 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.32.0"
|
||||
version = "1.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"autocfg",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2 0.5.3",
|
||||
"socket2 0.4.9",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"io-uring 0.6.1",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"scopeguard",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-uring",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-io-timeout"
|
||||
version = "1.2.0"
|
||||
@@ -4723,20 +4679,6 @@ dependencies = [
|
||||
"tungstenite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-uring"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef"
|
||||
dependencies = [
|
||||
"io-uring 0.5.13",
|
||||
"libc",
|
||||
"scoped-tls",
|
||||
"slab",
|
||||
"socket2 0.4.9",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.8"
|
||||
|
||||
@@ -83,7 +83,6 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ, PageWriteGuard};
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
@@ -169,7 +169,7 @@ impl FileBlockReader {
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(&self, buf: PageWriteGuard<'static>, blkno: u32) -> Result<PageWriteGuard<'static>, std::io::Error> {
|
||||
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
@@ -198,7 +198,7 @@ impl FileBlockReader {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
let write_guard = self.fill_buffer(write_guard, blknum).await?;
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,8 +89,10 @@ impl EphemeralFile {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let write_guard = self.file
|
||||
.read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
|
||||
@@ -11,13 +11,11 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
@@ -372,7 +370,7 @@ impl VirtualFile {
|
||||
///
|
||||
/// We are doing it via a macro as Rust doesn't support async closures that
|
||||
/// take on parameters with lifetimes.
|
||||
async fn lock_file(&self) -> Result<FileGuard<'static>, Error> {
|
||||
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -462,59 +460,24 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at(
|
||||
&self,
|
||||
mut write_guard: PageWriteGuard<'static>,
|
||||
mut offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let file_guard: FileGuard<'static> = self.lock_file().await?;
|
||||
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
struct PageWriteGuardBuf {
|
||||
buf: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.buf.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.buf.len()
|
||||
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.buf.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.buf.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
let buf = PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) };
|
||||
let guard = scopeguard::guard(file_guard, |_| {
|
||||
panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)")
|
||||
});
|
||||
let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await;
|
||||
let _ = OwnedFd::into_raw_fd(owned_fd);
|
||||
let _ = scopeguard::ScopeGuard::into_inner(guard);
|
||||
let PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to,
|
||||
} = buf;
|
||||
if let Ok(num_read) = res {
|
||||
assert!(init_up_to == num_read); // TODO need to deal with short reads here
|
||||
}
|
||||
res.map(|_| write_guard)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
@@ -564,6 +527,18 @@ impl VirtualFile {
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Read, |file| file
|
||||
.as_ref()
|
||||
.read_at(buf, offset));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file| file
|
||||
.as_ref()
|
||||
|
||||
Reference in New Issue
Block a user