diff --git a/pageserver/src/relish_storage.rs b/pageserver/src/relish_storage.rs index 70d75c34bf..885ca9581f 100644 --- a/pageserver/src/relish_storage.rs +++ b/pageserver/src/relish_storage.rs @@ -55,15 +55,21 @@ pub trait RelishStorage: Send + Sync { async fn list_relishes(&self) -> anyhow::Result>; - async fn download_relish( + async fn download_relish( &self, from: &Self::RelishStoragePath, - to: &Path, - ) -> anyhow::Result<()>; + // rust_s3 `get_object_stream` method requires `std::io::BufWriter` for some reason, not the async counterpart + // that forces us to consume and return the writer to satisfy the blocking operation async wrapper requirements + to: std::io::BufWriter, + ) -> anyhow::Result>; async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()>; - async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()>; + async fn upload_relish( + &self, + from: &mut tokio::io::BufReader, + to: &Self::RelishStoragePath, + ) -> anyhow::Result<()>; } fn strip_workspace_prefix<'a>( diff --git a/pageserver/src/relish_storage/local_fs.rs b/pageserver/src/relish_storage/local_fs.rs index 78ee858a5b..49d656d5a6 100644 --- a/pageserver/src/relish_storage/local_fs.rs +++ b/pageserver/src/relish_storage/local_fs.rs @@ -9,11 +9,13 @@ use std::{ future::Future, + io::Write, path::{Path, PathBuf}, pin::Pin, }; use anyhow::{bail, Context}; +use tokio::{fs, io}; use super::{strip_workspace_prefix, RelishStorage}; @@ -64,16 +66,33 @@ impl RelishStorage for LocalFs { Ok(get_all_files(&self.root).await?.into_iter().collect()) } - async fn download_relish( + async fn download_relish( &self, from: &Self::RelishStoragePath, - to: &Path, - ) -> anyhow::Result<()> { + mut to: std::io::BufWriter, + ) -> anyhow::Result> { let file_path = self.resolve_in_storage(from)?; if file_path.exists() && file_path.is_file() { - create_target_directory(to).await?; - tokio::fs::copy(file_path, to).await?; - Ok(()) + let updated_buffer = tokio::task::spawn_blocking(move || { + let mut source = std::io::BufReader::new( + std::fs::OpenOptions::new() + .read(true) + .open(&file_path) + .with_context(|| { + format!( + "Failed to open source file '{}' to use in the download", + file_path.display() + ) + })?, + ); + std::io::copy(&mut source, &mut to) + .context("Failed to download the relish file")?; + to.flush().context("Failed to flush the download buffer")?; + Ok::<_, anyhow::Error>(to) + }) + .await + .context("Failed to spawn a blocking task")??; + Ok(updated_buffer) } else { bail!( "File '{}' either does not exist or is not a file", @@ -94,18 +113,30 @@ impl RelishStorage for LocalFs { } } - async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> { + async fn upload_relish( + &self, + from: &mut io::BufReader, + to: &Self::RelishStoragePath, + ) -> anyhow::Result<()> { let target_file_path = self.resolve_in_storage(to)?; create_target_directory(&target_file_path).await?; + let mut destination = io::BufWriter::new( + fs::OpenOptions::new() + .write(true) + .create(true) + .open(&target_file_path) + .await + .with_context(|| { + format!( + "Failed to open target fs destination at '{}'", + target_file_path.display() + ) + })?, + ); - tokio::fs::copy(&from, &target_file_path) + io::copy_buf(from, &mut destination) .await - .with_context(|| { - format!( - "Failed to upload relish '{}' to local storage", - from.display(), - ) - })?; + .context("Failed to upload relish to local storage")?; Ok(()) } } diff --git a/pageserver/src/relish_storage/rust_s3.rs b/pageserver/src/relish_storage/rust_s3.rs index dc29752e99..d32d357b27 100644 --- a/pageserver/src/relish_storage/rust_s3.rs +++ b/pageserver/src/relish_storage/rust_s3.rs @@ -1,5 +1,6 @@ //! A wrapper around AWS S3 client library `rust_s3` to be used a relish storage. +use std::io::Write; use std::path::Path; use anyhow::Context; @@ -83,18 +84,14 @@ impl RelishStorage for RustS3 { .collect()) } - async fn download_relish( + async fn download_relish( &self, from: &Self::RelishStoragePath, - to: &Path, - ) -> anyhow::Result<()> { - let mut target_file = std::fs::OpenOptions::new() - .write(true) - .open(to) - .with_context(|| format!("Failed to open target s3 destination at {}", to.display()))?; + mut to: std::io::BufWriter, + ) -> anyhow::Result> { let code = self .bucket - .get_object_stream(from.key(), &mut target_file) + .get_object_stream(from.key(), &mut to) .await .with_context(|| format!("Failed to download s3 object with key {}", from.key()))?; if code != 200 { @@ -103,7 +100,12 @@ impl RelishStorage for RustS3 { code )) } else { - Ok(()) + tokio::task::spawn_blocking(move || { + to.flush().context("Failed to fluch the downoad buffer")?; + Ok::<_, anyhow::Error>(to) + }) + .await + .context("Failed to joim the download buffer flush task")? } } @@ -124,12 +126,14 @@ impl RelishStorage for RustS3 { } } - async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> { - let mut local_file = tokio::fs::OpenOptions::new().read(true).open(from).await?; - + async fn upload_relish( + &self, + from: &mut tokio::io::BufReader, + to: &Self::RelishStoragePath, + ) -> anyhow::Result<()> { let code = self .bucket - .put_object_stream(&mut local_file, to.key()) + .put_object_stream(from, to.key()) .await .with_context(|| format!("Failed to create s3 object with key {}", to.key()))?; if code != 200 {