From e68555e6ee663b969b035b7c95884c8537c9baf0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 20 Aug 2024 12:08:21 +0000 Subject: [PATCH] fix the compile errors & progress with testability --- Cargo.lock | 7 ++ Cargo.toml | 1 + pageserver/Cargo.toml | 1 + .../tenant/storage_layer/inmemory_layer.rs | 9 +- .../inmemory_layer/vectored_dio_read.rs | 119 ++++++++++-------- 5 files changed, 83 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dee15b6aa7..d41f362a5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3736,6 +3736,7 @@ dependencies = [ "reqwest 0.12.4", "rpds", "scopeguard", + "send-future", "serde", "serde_json", "serde_path_to_error", @@ -5466,6 +5467,12 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +[[package]] +name = "send-future" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87" + [[package]] name = "sentry" version = "0.32.3" diff --git a/Cargo.toml b/Cargo.toml index 963841e340..e513757e16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -145,6 +145,7 @@ rustls-split = "0.3" scopeguard = "1.1" sysinfo = "0.29.2" sd-notify = "0.4.1" +send-future = "0.1.0" sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0e748ee3db..cb368a69f0 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -52,6 +52,7 @@ rand.workspace = true range-set-blaze = { version = "0.1.16", features = ["alloc"] } regex.workspace = true scopeguard.workspace = true +send-future.workspace = true serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } serde_path_to_error.workspace = true diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 288d96a0d8..39fc05b39f 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -15,7 +15,6 @@ use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Result}; use bytes::Bytes; use camino::Utf8PathBuf; -use itertools::Itertools; use pageserver_api::key::CompactKey; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; @@ -23,7 +22,6 @@ use pageserver_api::shard::TenantShardId; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, OnceLock}; use std::time::Instant; -use tokio_epoll_uring::BoundedBuf; use tracing::*; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; // avoid binding to Write (conflicts with std::io::Write) @@ -318,14 +316,15 @@ impl InMemoryLayer { } // Execute the read. - vectored_dio_read::execute( + let f = vectored_dio_read::execute( &inner.file, reads .iter() .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), &ctx, - ) - .await; + ); + send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865 + .await; // Process results into the reconstruct state 'next_key: for (key, value_reads) in reads { diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index 7ffa46460d..3f216e9861 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -8,7 +8,12 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use crate::context::RequestContext; -pub trait File { +mod sealed { + pub trait Sealed {} +} + +/// The file interface we require. At runtime, this is a [`crate::virtual_file::VirtualFile`]. +pub trait File: Send { async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( &'b self, start: u32, @@ -17,40 +22,14 @@ pub trait File { ) -> std::io::Result<(Slice, usize)>; } -trait Sealed {} - -pub trait Buffer: Sealed + std::ops::Deref { - fn cap(&self) -> usize; - fn len(&self) -> usize; - fn remaining(&self) -> usize { - self.cap().checked_sub(self.len()).unwrap() - } - /// Panics if the total length would exceed the initialized capacity. - fn extend_from_slice(&mut self, src: &[u8]); -} - +/// A logical read that our user wants to do. pub struct ValueRead { pos: u32, state: MutexRefCell>>, } -struct MutexRefCell(Mutex); -impl MutexRefCell { - fn new(value: T) -> Self { - Self(Mutex::new(value)) - } - fn borrow(&self) -> impl std::ops::Deref + '_ { - self.0.lock().unwrap() - } - fn borrow_mut(&self) -> impl std::ops::DerefMut + '_ { - self.0.lock().unwrap() - } - fn into_inner(self) -> T { - self.0.into_inner().unwrap() - } -} - impl ValueRead { + /// Create a new [`ValueRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`. pub fn new(pos: u32, buf: B) -> Self { Self { pos, @@ -62,25 +41,25 @@ impl ValueRead { } } -impl Sealed for Vec {} -impl Buffer for Vec { - fn cap(&self) -> usize { - self.capacity() - } - - fn len(&self) -> usize { - self.len() - } - - fn extend_from_slice(&mut self, src: &[u8]) { - if self.len() + src.len() > self.cap() { - panic!("Buffer capacity exceeded"); - } - Vec::extend_from_slice(self, src); +/// The buffer into which a [`ValueRead`] result is placed. +pub trait Buffer: sealed::Sealed + std::ops::Deref { + fn cap(&self) -> usize; + fn len(&self) -> usize; + fn remaining(&self) -> usize { + self.cap().checked_sub(self.len()).unwrap() } + /// Panics if the total length would exceed the initialized capacity. + fn extend_from_slice(&mut self, src: &[u8]); } -pub async fn execute<'a, 'b, 'c, I, F, B>(file: &'c F, reads: I, ctx: &'b RequestContext) +/// Execute the given `reads` against `file`. +/// The results are placed in the buffers of the [`ValueRead`]s. +/// Retrieve the results by calling [`ValueRead::into_result`] on each [`ValueRead`]. +/// +/// The [`ValueRead`]s must be freshly created using [`ValueRead::new`] when calling this function. +/// Otherwise, it might panic or the value resad result will be undefined. +/// TODO: prevent this through type system. +pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext) where I: IntoIterator> + Send, F: File + Send, @@ -98,12 +77,20 @@ where let mut chunk_reads: BTreeMap>> = BTreeMap::new(); for value_read in reads { let ValueRead { pos, state } = value_read; - let len = state - .borrow() + let state = state.borrow(); + match state.as_ref() { + Err(_) => panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`"), + Ok(buf) => { + if buf.len() != 0 { + panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`"); + } + } + } + let remaining = state .as_ref() .expect("we haven't started reading, no chance it's in Err() state") - .len(); - let mut remaining = usize::try_from(len).unwrap(); + .remaining(); + let mut remaining = usize::try_from(remaining).unwrap(); let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32); let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap(); while remaining > 0 { @@ -242,3 +229,37 @@ where } } } + +struct MutexRefCell(Mutex); +impl MutexRefCell { + fn new(value: T) -> Self { + Self(Mutex::new(value)) + } + fn borrow(&self) -> impl std::ops::Deref + '_ { + self.0.lock().unwrap() + } + fn borrow_mut(&self) -> impl std::ops::DerefMut + '_ { + self.0.lock().unwrap() + } + fn into_inner(self) -> T { + self.0.into_inner().unwrap() + } +} + +impl sealed::Sealed for Vec {} +impl Buffer for Vec { + fn cap(&self) -> usize { + self.capacity() + } + + fn len(&self) -> usize { + self.len() + } + + fn extend_from_slice(&mut self, src: &[u8]) { + if self.len() + src.len() > self.cap() { + panic!("Buffer capacity exceeded"); + } + Vec::extend_from_slice(self, src); + } +}