From f0efc908d7793b8f52150831b7a72310c42fc38e Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Fri, 8 Nov 2024 15:19:36 +0000 Subject: [PATCH] use Arc around W: OwnedAsyncWriter Signed-off-by: Yuchen Liang --- pageserver/src/tenant/ephemeral_file.rs | 21 +++++++++------- .../tenant/remote_timeline_client/download.rs | 13 +++++++--- .../virtual_file/owned_buffers_io/write.rs | 25 ++++++++++++------- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index f1ce768c39..9a82c131f2 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -19,6 +19,7 @@ use tracing::error; use std::io; use std::sync::atomic::AtomicU64; +use std::sync::Arc; use utils::id::TimelineId; pub struct EphemeralFile { @@ -51,15 +52,17 @@ impl EphemeralFile { "ephemeral-{filename_disambiguator}" ))); - let file = VirtualFile::open_with_options( - &filename, - virtual_file::OpenOptions::new() - .read(true) - .write(true) - .create(true), - ctx, - ) - .await?; + let file = Arc::new( + VirtualFile::open_with_options( + &filename, + virtual_file::OpenOptions::new() + .read(true) + .write(true) + .create(true), + ctx, + ) + .await?, + ); let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 05275ef852..d291f4302d 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -6,6 +6,7 @@ use std::collections::HashSet; use std::future::Future; use std::str::FromStr; +use std::sync::Arc; use std::time::SystemTime; use anyhow::{anyhow, Context}; @@ -206,10 +207,14 @@ async fn download_object<'a>( use crate::virtual_file::owned_buffers_io; use bytes::BytesMut; async { - let destination_file = VirtualFile::create(dst_path, ctx) - .await - .with_context(|| format!("create a destination file for layer '{dst_path}'")) - .map_err(DownloadError::Other)?; + let destination_file = Arc::new( + VirtualFile::create(dst_path, ctx) + .await + .with_context(|| { + format!("create a destination file for layer '{dst_path}'") + }) + .map_err(DownloadError::Other)?, + ); let mut download = storage .download(src_path, &DownloadOpts::default(), cancel) diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 44f40bf846..3bd3cac09d 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use bytes::BytesMut; use tokio_epoll_uring::IoBuf; @@ -32,7 +34,7 @@ pub trait OwnedAsyncWriter { /// In such cases, a different implementation that always buffers in memory /// may be preferable. pub struct BufferedWriter { - writer: W, + writer: Arc, /// invariant: always remains Some(buf) except /// - while IO is ongoing => goes back to Some() once the IO completed successfully /// - after an IO error => stays `None` forever @@ -48,7 +50,7 @@ where Buf: IoBuf + Send, W: OwnedAsyncWriter, { - pub fn new(writer: W, buf: B) -> Self { + pub fn new(writer: Arc, buf: B) -> Self { Self { writer, buf: Some(buf), @@ -70,7 +72,10 @@ where } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result<(u64, W)> { + pub async fn flush_and_into_inner( + mut self, + ctx: &RequestContext, + ) -> std::io::Result<(u64, Arc)> { self.flush(ctx).await?; let Self { @@ -290,7 +295,7 @@ mod tests { #[tokio::test] async fn test_buffered_writes_only() -> std::io::Result<()> { - let recorder = RecorderWriter::default(); + let recorder = Arc::new(RecorderWriter::default()); let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); write!(writer, b"a"); write!(writer, b"b"); @@ -307,7 +312,7 @@ mod tests { #[tokio::test] async fn test_passthrough_writes_only() -> std::io::Result<()> { - let recorder = RecorderWriter::default(); + let recorder = Arc::new(RecorderWriter::default()); let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); write!(writer, b"abc"); write!(writer, b"de"); @@ -323,8 +328,9 @@ mod tests { #[tokio::test] async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> { - let recorder = RecorderWriter::default(); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); + let recorder = Arc::new(RecorderWriter::default()); + let mut writer = + BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2)); write!(writer, b"a"); write!(writer, b"bc"); write!(writer, b"d"); @@ -341,8 +347,9 @@ mod tests { async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> { let ctx = test_ctx(); let ctx = &ctx; - let recorder = RecorderWriter::default(); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); + let recorder = Arc::new(RecorderWriter::default()); + let mut writer = + BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2)); writer.write_buffered_borrowed(b"abc", ctx).await?; writer.write_buffered_borrowed(b"d", ctx).await?;