fix the compile errors & progress with testability

This commit is contained in:
Christian Schwarz
2024-08-20 12:08:21 +00:00
parent 521da324ab
commit e68555e6ee
5 changed files with 83 additions and 54 deletions

7
Cargo.lock generated
View File

@@ -3736,6 +3736,7 @@ dependencies = [
"reqwest 0.12.4",
"rpds",
"scopeguard",
"send-future",
"serde",
"serde_json",
"serde_path_to_error",
@@ -5466,6 +5467,12 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
[[package]]
name = "send-future"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87"
[[package]]
name = "sentry"
version = "0.32.3"

View File

@@ -145,6 +145,7 @@ rustls-split = "0.3"
scopeguard = "1.1"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
send-future = "0.1.0"
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"

View File

@@ -52,6 +52,7 @@ rand.workspace = true
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
regex.workspace = true
scopeguard.workspace = true
send-future.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde_path_to_error.workspace = true

View File

@@ -15,7 +15,6 @@ use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use itertools::Itertools;
use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
@@ -23,7 +22,6 @@ use pageserver_api::shard::TenantShardId;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
// avoid binding to Write (conflicts with std::io::Write)
@@ -318,14 +316,15 @@ impl InMemoryLayer {
}
// Execute the read.
vectored_dio_read::execute(
let f = vectored_dio_read::execute(
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&ctx,
)
.await;
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {

View File

@@ -8,7 +8,12 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use crate::context::RequestContext;
pub trait File {
mod sealed {
pub trait Sealed {}
}
/// The file interface we require. At runtime, this is a [`crate::virtual_file::VirtualFile`].
pub trait File: Send {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u32,
@@ -17,40 +22,14 @@ pub trait File {
) -> std::io::Result<(Slice<B>, usize)>;
}
trait Sealed {}
pub trait Buffer: Sealed + std::ops::Deref<Target = [u8]> {
fn cap(&self) -> usize;
fn len(&self) -> usize;
fn remaining(&self) -> usize {
self.cap().checked_sub(self.len()).unwrap()
}
/// Panics if the total length would exceed the initialized capacity.
fn extend_from_slice(&mut self, src: &[u8]);
}
/// A logical read that our user wants to do.
pub struct ValueRead<B: Buffer> {
pos: u32,
state: MutexRefCell<Result<B, Arc<std::io::Error>>>,
}
struct MutexRefCell<T>(Mutex<T>);
impl<T> MutexRefCell<T> {
fn new(value: T) -> Self {
Self(Mutex::new(value))
}
fn borrow(&self) -> impl std::ops::Deref<Target = T> + '_ {
self.0.lock().unwrap()
}
fn borrow_mut(&self) -> impl std::ops::DerefMut<Target = T> + '_ {
self.0.lock().unwrap()
}
fn into_inner(self) -> T {
self.0.into_inner().unwrap()
}
}
impl<B: Buffer> ValueRead<B> {
/// Create a new [`ValueRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`.
pub fn new(pos: u32, buf: B) -> Self {
Self {
pos,
@@ -62,25 +41,25 @@ impl<B: Buffer> ValueRead<B> {
}
}
impl Sealed for Vec<u8> {}
impl Buffer for Vec<u8> {
fn cap(&self) -> usize {
self.capacity()
}
fn len(&self) -> usize {
self.len()
}
fn extend_from_slice(&mut self, src: &[u8]) {
if self.len() + src.len() > self.cap() {
panic!("Buffer capacity exceeded");
}
Vec::extend_from_slice(self, src);
/// The buffer into which a [`ValueRead`] result is placed.
pub trait Buffer: sealed::Sealed + std::ops::Deref<Target = [u8]> {
fn cap(&self) -> usize;
fn len(&self) -> usize;
fn remaining(&self) -> usize {
self.cap().checked_sub(self.len()).unwrap()
}
/// Panics if the total length would exceed the initialized capacity.
fn extend_from_slice(&mut self, src: &[u8]);
}
pub async fn execute<'a, 'b, 'c, I, F, B>(file: &'c F, reads: I, ctx: &'b RequestContext)
/// Execute the given `reads` against `file`.
/// The results are placed in the buffers of the [`ValueRead`]s.
/// Retrieve the results by calling [`ValueRead::into_result`] on each [`ValueRead`].
///
/// The [`ValueRead`]s must be freshly created using [`ValueRead::new`] when calling this function.
/// Otherwise, it might panic or the value resad result will be undefined.
/// TODO: prevent this through type system.
pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext)
where
I: IntoIterator<Item = &'a ValueRead<B>> + Send,
F: File + Send,
@@ -98,12 +77,20 @@ where
let mut chunk_reads: BTreeMap<u32, Vec<ChunkReadDestination<B>>> = BTreeMap::new();
for value_read in reads {
let ValueRead { pos, state } = value_read;
let len = state
.borrow()
let state = state.borrow();
match state.as_ref() {
Err(_) => panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`"),
Ok(buf) => {
if buf.len() != 0 {
panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`");
}
}
}
let remaining = state
.as_ref()
.expect("we haven't started reading, no chance it's in Err() state")
.len();
let mut remaining = usize::try_from(len).unwrap();
.remaining();
let mut remaining = usize::try_from(remaining).unwrap();
let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32);
let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap();
while remaining > 0 {
@@ -242,3 +229,37 @@ where
}
}
}
struct MutexRefCell<T>(Mutex<T>);
impl<T> MutexRefCell<T> {
fn new(value: T) -> Self {
Self(Mutex::new(value))
}
fn borrow(&self) -> impl std::ops::Deref<Target = T> + '_ {
self.0.lock().unwrap()
}
fn borrow_mut(&self) -> impl std::ops::DerefMut<Target = T> + '_ {
self.0.lock().unwrap()
}
fn into_inner(self) -> T {
self.0.into_inner().unwrap()
}
}
impl sealed::Sealed for Vec<u8> {}
impl Buffer for Vec<u8> {
fn cap(&self) -> usize {
self.capacity()
}
fn len(&self) -> usize {
self.len()
}
fn extend_from_slice(&mut self, src: &[u8]) {
if self.len() + src.len() > self.cap() {
panic!("Buffer capacity exceeded");
}
Vec::extend_from_slice(self, src);
}
}