Allow downloading remote files partially

This commit is contained in:
Kirill Bulatov
2021-11-05 14:56:40 +02:00
committed by Kirill Bulatov
parent e7ca8ef5a8
commit 99dbbe5f18
3 changed files with 216 additions and 20 deletions

View File

@@ -147,6 +147,15 @@ trait RemoteStorage: Send + Sync {
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()>;
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
async fn download_range(
&self,
from: &Self::StoragePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()>;
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>;
}

View File

@@ -11,16 +11,15 @@ use std::{
pin::Pin,
};
use anyhow::{bail, Context};
use anyhow::{bail, ensure, Context};
use tokio::{
fs,
io::{self, AsyncWriteExt},
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use tracing::*;
use crate::layered_repository::metadata::METADATA_FILE_NAME;
use super::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage};
use crate::layered_repository::metadata::METADATA_FILE_NAME;
pub struct LocalFs {
pageserver_workdir: &'static Path,
@@ -154,6 +153,64 @@ impl RemoteStorage for LocalFs {
}
}
async fn download_range(
&self,
from: &Self::StoragePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
if let Some(end_exclusive) = end_exclusive {
ensure!(
end_exclusive > start_inclusive,
"Invalid range, start ({}) is bigger then end ({:?})",
start_inclusive,
end_exclusive
);
if start_inclusive == end_exclusive.saturating_sub(1) {
return Ok(());
}
}
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
let mut source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&file_path)
.await
.with_context(|| {
format!(
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})?,
);
source
.seek(io::SeekFrom::Start(start_inclusive))
.await
.context("Failed to seek to the range start in a local storage file")?;
match end_exclusive {
Some(end_exclusive) => {
io::copy(&mut source.take(end_exclusive - start_inclusive), to).await
}
None => io::copy(&mut source, to).await,
}
.with_context(|| {
format!(
"Failed to download file '{}' range from the local storage",
file_path.display()
)
})?;
Ok(())
} else {
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
}
}
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> {
let file_path = self.resolve_in_storage(path)?;
if file_path.exists() && file_path.is_file() {
@@ -450,20 +507,6 @@ mod fs_tests {
"Should list a two different files after second upload"
);
// match storage.upload_file(&mut source, &target_path_1).await {
// Ok(()) => panic!("Should not allow reuploading storage files"),
// Err(e) => {
// let message = format!("{:?}", e);
// assert!(message.contains(&target_path_1.display().to_string()));
// assert!(message.contains("File exists"));
// }
// }
assert_eq!(
list_files_sorted(&storage).await?,
vec![target_path_1, target_path_2],
"Should list a two different files after all upload attempts"
);
Ok(())
}
@@ -503,6 +546,118 @@ mod fs_tests {
Ok(())
}
#[tokio::test]
async fn download_file_range_positive() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("download_file_range_positive")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage
.download_range(&upload_target, 0, None, &mut full_range_bytes)
.await?;
full_range_bytes.flush().await?;
assert_eq!(
dummy_contents(upload_name),
String::from_utf8(full_range_bytes.into_inner().into_inner())?,
"Download full range should return the whole upload"
);
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let same_byte = 1_000_000_000;
storage
.download_range(
&upload_target,
same_byte,
Some(same_byte + 1), // exclusive end
&mut zero_range_bytes,
)
.await?;
zero_range_bytes.flush().await?;
assert!(
zero_range_bytes.into_inner().into_inner().is_empty(),
"Zero byte range should not download any part of the file"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage
.download_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
first_part_local,
first_part_remote.as_slice(),
"First part bytes should be returned when requrested"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage
.download_range(
&upload_target,
first_part_local.len() as u64,
Some((first_part_local.len() + second_part_local.len()) as u64),
&mut second_part_remote,
)
.await?;
second_part_remote.flush().await?;
let second_part_remote = second_part_remote.into_inner().into_inner();
assert_eq!(
second_part_local,
second_part_remote.as_slice(),
"Second part bytes should be returned when requrested"
);
Ok(())
}
#[tokio::test]
async fn download_file_range_negative() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("download_file_range_negative")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
let start = 10000;
let end = 234;
assert!(start > end, "Should test an incorrect range");
match storage
.download_range(&upload_target, start, Some(end), &mut io::sink())
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("Invalid range"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage
.download_range(&non_existing_path, 1, Some(3), &mut io::sink())
.await
{
Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&non_existing_path.display().to_string()));
}
}
Ok(())
}
#[tokio::test]
async fn delete_file() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("delete_file")?;

View File

@@ -148,17 +148,49 @@ impl RemoteStorage for S3 {
.with_context(|| format!("Failed to download s3 object with key {}", from.key()))?;
if code != 200 {
Err(anyhow::format_err!(
"Received non-200 exit code during downloading object from directory, code: {}",
"Received non-200 exit code during downloading object, code: {}",
code
))
} else {
io::copy(&mut io::BufReader::new(data.as_slice()), to)
// we don't have to write vector into the destination this way, `to_write_all` would be enough.
// but we want to prepare for migration on `rusoto`, that has a streaming HTTP body instead here, with
// which it makes more sense to use `io::copy`.
io::copy(&mut data.as_slice(), to)
.await
.context("Failed to write downloaded data into the destination buffer")?;
Ok(())
}
}
async fn download_range(
&self,
from: &Self::StoragePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
let (data, code) = self
.bucket
.get_object_range(from.key(), start_inclusive, end_inclusive)
.await
.with_context(|| format!("Failed to download s3 object with key {}", from.key()))?;
if code != 206 {
Err(anyhow::format_err!(
"Received non-206 exit code during downloading object range, code: {}",
code
))
} else {
// see `download` function above for the comment on why `Vec<u8>` buffer is copied this way
io::copy(&mut data.as_slice(), to)
.await
.context("Failed to write downloaded range into the destination buffer")?;
Ok(())
}
}
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> {
let (_, code) = self
.bucket