Refactor upload/download_relish function signatures.

This makes them more generic, by taking any Read / Write trait
implementation, instead of operating directly on a a file.
This commit is contained in:
Kirill Bulatov
2021-10-13 21:34:33 +03:00
committed by Kirill Bulatov
parent 100da024b6
commit 4ade0bb41c
3 changed files with 72 additions and 31 deletions

View File

@@ -55,15 +55,21 @@ pub trait RelishStorage: Send + Sync {
async fn list_relishes(&self) -> anyhow::Result<Vec<Self::RelishStoragePath>>;
async fn download_relish(
async fn download_relish<W: 'static + std::io::Write + Send>(
&self,
from: &Self::RelishStoragePath,
to: &Path,
) -> anyhow::Result<()>;
// rust_s3 `get_object_stream` method requires `std::io::BufWriter` for some reason, not the async counterpart
// that forces us to consume and return the writer to satisfy the blocking operation async wrapper requirements
to: std::io::BufWriter<W>,
) -> anyhow::Result<std::io::BufWriter<W>>;
async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()>;
async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()>;
async fn upload_relish<R: tokio::io::AsyncRead + std::marker::Unpin + Send>(
&self,
from: &mut tokio::io::BufReader<R>,
to: &Self::RelishStoragePath,
) -> anyhow::Result<()>;
}
fn strip_workspace_prefix<'a>(

View File

@@ -9,11 +9,13 @@
use std::{
future::Future,
io::Write,
path::{Path, PathBuf},
pin::Pin,
};
use anyhow::{bail, Context};
use tokio::{fs, io};
use super::{strip_workspace_prefix, RelishStorage};
@@ -64,16 +66,33 @@ impl RelishStorage for LocalFs {
Ok(get_all_files(&self.root).await?.into_iter().collect())
}
async fn download_relish(
async fn download_relish<W: 'static + std::io::Write + Send>(
&self,
from: &Self::RelishStoragePath,
to: &Path,
) -> anyhow::Result<()> {
mut to: std::io::BufWriter<W>,
) -> anyhow::Result<std::io::BufWriter<W>> {
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
create_target_directory(to).await?;
tokio::fs::copy(file_path, to).await?;
Ok(())
let updated_buffer = tokio::task::spawn_blocking(move || {
let mut source = std::io::BufReader::new(
std::fs::OpenOptions::new()
.read(true)
.open(&file_path)
.with_context(|| {
format!(
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})?,
);
std::io::copy(&mut source, &mut to)
.context("Failed to download the relish file")?;
to.flush().context("Failed to flush the download buffer")?;
Ok::<_, anyhow::Error>(to)
})
.await
.context("Failed to spawn a blocking task")??;
Ok(updated_buffer)
} else {
bail!(
"File '{}' either does not exist or is not a file",
@@ -94,18 +113,30 @@ impl RelishStorage for LocalFs {
}
}
async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> {
async fn upload_relish<R: io::AsyncRead + std::marker::Unpin + Send>(
&self,
from: &mut io::BufReader<R>,
to: &Self::RelishStoragePath,
) -> anyhow::Result<()> {
let target_file_path = self.resolve_in_storage(to)?;
create_target_directory(&target_file_path).await?;
let mut destination = io::BufWriter::new(
fs::OpenOptions::new()
.write(true)
.create(true)
.open(&target_file_path)
.await
.with_context(|| {
format!(
"Failed to open target fs destination at '{}'",
target_file_path.display()
)
})?,
);
tokio::fs::copy(&from, &target_file_path)
io::copy_buf(from, &mut destination)
.await
.with_context(|| {
format!(
"Failed to upload relish '{}' to local storage",
from.display(),
)
})?;
.context("Failed to upload relish to local storage")?;
Ok(())
}
}

View File

@@ -1,5 +1,6 @@
//! A wrapper around AWS S3 client library `rust_s3` to be used a relish storage.
use std::io::Write;
use std::path::Path;
use anyhow::Context;
@@ -83,18 +84,14 @@ impl RelishStorage for RustS3 {
.collect())
}
async fn download_relish(
async fn download_relish<W: 'static + std::io::Write + Send>(
&self,
from: &Self::RelishStoragePath,
to: &Path,
) -> anyhow::Result<()> {
let mut target_file = std::fs::OpenOptions::new()
.write(true)
.open(to)
.with_context(|| format!("Failed to open target s3 destination at {}", to.display()))?;
mut to: std::io::BufWriter<W>,
) -> anyhow::Result<std::io::BufWriter<W>> {
let code = self
.bucket
.get_object_stream(from.key(), &mut target_file)
.get_object_stream(from.key(), &mut to)
.await
.with_context(|| format!("Failed to download s3 object with key {}", from.key()))?;
if code != 200 {
@@ -103,7 +100,12 @@ impl RelishStorage for RustS3 {
code
))
} else {
Ok(())
tokio::task::spawn_blocking(move || {
to.flush().context("Failed to fluch the downoad buffer")?;
Ok::<_, anyhow::Error>(to)
})
.await
.context("Failed to joim the download buffer flush task")?
}
}
@@ -124,12 +126,14 @@ impl RelishStorage for RustS3 {
}
}
async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> {
let mut local_file = tokio::fs::OpenOptions::new().read(true).open(from).await?;
async fn upload_relish<R: tokio::io::AsyncRead + std::marker::Unpin + Send>(
&self,
from: &mut tokio::io::BufReader<R>,
to: &Self::RelishStoragePath,
) -> anyhow::Result<()> {
let code = self
.bucket
.put_object_stream(&mut local_file, to.key())
.put_object_stream(from, to.key())
.await
.with_context(|| format!("Failed to create s3 object with key {}", to.key()))?;
if code != 200 {