From 521da324abc6fb110ec59704595d2e6a969f9812 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 20 Aug 2024 10:07:07 +0000 Subject: [PATCH] Revert "try a closure approach, also has same error "Send is not general enough"" This reverts commit ef2b384cf8be44f7433baae633031dfb039a971b. --- .../tenant/storage_layer/inmemory_layer.rs | 14 +++++++++- .../inmemory_layer/vectored_dio_read.rs | 26 +++++++++++++------ 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 2fdedc4230..288d96a0d8 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -319,10 +319,11 @@ impl InMemoryLayer { // Execute the read. vectored_dio_read::execute( - |_, _| async move { todo!() }, + &inner.file, reads .iter() .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), + &ctx, ) .await; @@ -359,6 +360,17 @@ impl InMemoryLayer { } } +impl vectored_dio_read::File for EphemeralFile { + async fn read_at_to_end<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( + &'b self, + start: u32, + dst: tokio_epoll_uring::Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { + EphemeralFile::read_at_to_end(self, start, dst, ctx).await + } +} + fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result { write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0) } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index cfff2b4097..7ffa46460d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -8,6 +8,15 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use crate::context::RequestContext; +pub trait File { + async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( + &'b self, + start: u32, + dst: Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(Slice, usize)>; +} + trait Sealed {} pub trait Buffer: Sealed + std::ops::Deref { @@ -71,11 +80,10 @@ impl Buffer for Vec { } } -pub async fn execute<'a, 'b, I, F, Fut, B>(file: F, reads: I) +pub async fn execute<'a, 'b, 'c, I, F, B>(file: &'c F, reads: I, ctx: &'b RequestContext) where I: IntoIterator> + Send, - F: Fn(u32, Slice>) -> Fut + Send, - Fut: std::future::Future>, usize), Arc>> + Send, + F: File + Send, B: Buffer + IoBufMut + Send, { const DIO_CHUNK_SIZE: usize = 512; @@ -198,11 +206,13 @@ where if all_done { continue; } - let (merged_read_buf_slice, nread) = match file( - start_chunk_no * DIO_CHUNK_SIZE as u32, - get_chunk_buf(nchunks).slice_full(), - ) - .await + let (merged_read_buf_slice, nread) = match file + .read_at_to_end( + start_chunk_no * DIO_CHUNK_SIZE as u32, + get_chunk_buf(nchunks).slice_full(), + ctx, + ) + .await { Ok(t) => t, Err(e) => {