mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
WIP
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use page_cache::PageWriteGuard;
|
||||
use virtual_file::VirtualFile;
|
||||
|
||||
mod page_cache;
|
||||
@@ -16,6 +17,7 @@ fn main() {
|
||||
match res {
|
||||
page_cache::ReadBufResult::Found(found) => todo!(),
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard: PageWriteGuard<'static> = write_guard;
|
||||
let file = VirtualFile::open(camino::Utf8Path::new("foo"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -13,14 +13,42 @@
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use tokio_epoll_uring::IoFd;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::marker::PhantomData;
|
||||
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use utils::fs_ext;
|
||||
|
||||
struct PageWriteGuardBuf {
|
||||
buf: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.buf.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.buf.len()
|
||||
}
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.buf.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.buf.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
/// the underlying file is closed if the system is low on file descriptors,
|
||||
@@ -561,52 +589,39 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
// pub async fn read_exact_at(
|
||||
// &self,
|
||||
// page: PageWriteGuard<'static>,
|
||||
// offset: u64,
|
||||
// ) -> Result<PageWriteGuard<'static>, Error> {
|
||||
// with_file!(self, StorageIoOperation::Read, |file_guard| {
|
||||
// self.read_exact_at0(file_guard, page, offset).await
|
||||
// })
|
||||
// }
|
||||
pub async fn read_exact_at(
|
||||
&self,
|
||||
page: PageWriteGuard<'static>,
|
||||
offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
|
||||
// https://github.com/rust-lang/rust/issues/102211#issuecomment-1367900125
|
||||
use std::future::Future;
|
||||
fn assert_send<'u, R>(
|
||||
fut: impl 'u + Send + Future<Output = R>,
|
||||
) -> impl 'u + Send + Future<Output = R> {
|
||||
fut
|
||||
}
|
||||
|
||||
with_file!(self, StorageIoOperation::Read, |file_guard| {
|
||||
assert_send(Self::read_exact_at0(unsafe {
|
||||
OwnedFd::from_raw_fd(file_guard.as_fd())
|
||||
}, page, offset)).await
|
||||
})
|
||||
}
|
||||
async fn read_exact_at0(
|
||||
file_guard: OwnedFd,
|
||||
write_guard: PageWriteGuard<'static>,
|
||||
offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let write_guard: PageWriteGuard<'static> = write_guard;
|
||||
let file_guard: FileGuard<'static> = self.lock_file().await?;
|
||||
// let write_guard: PageWriteGuard<'static> = write_guard;
|
||||
// let file_guard: FileGuard<'static> = self.lock_file().await?;
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
struct PageWriteGuardBuf {
|
||||
buf: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.buf.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.buf.len()
|
||||
}
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.buf.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.buf.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
let buf = PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to: 0,
|
||||
};
|
||||
// let _ = system.read(file_guard, offset, buf);
|
||||
let ((_, buf), res) = system.read(file_guard, offset, buf).await;
|
||||
let PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
@@ -678,8 +693,6 @@ struct FileGuard<'a> {
|
||||
slot_guard: RwLockReadGuard<'a, SlotInner>,
|
||||
}
|
||||
|
||||
unsafe impl<'a> Send for FileGuard<'a> {}
|
||||
|
||||
impl AsRef<OwnedFd> for FileGuard<'static> {
|
||||
fn as_ref(&self) -> &OwnedFd {
|
||||
// This unwrap is safe because we only create `FileGuard`s
|
||||
|
||||
Reference in New Issue
Block a user