Require specifying the upload size in remote storage

This commit is contained in:
Kirill Bulatov
2022-04-19 22:06:02 +03:00
committed by Kirill Bulatov
parent ef72eb84cf
commit 44bfc529f6
3 changed files with 22 additions and 19 deletions

View File

@@ -324,6 +324,9 @@ trait RemoteStorage: Send + Sync {
async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
/// S3 PUT request requires the content length to be specified,
/// otherwise it starts to fail with the concurrent connection count increasing.
from_size_kb: usize,
to: &Self::StoragePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()>;

View File

@@ -104,7 +104,8 @@ impl RemoteStorage for LocalFs {
async fn upload(
&self,
mut from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_kb: usize,
to: &Self::StoragePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
@@ -128,7 +129,7 @@ impl RemoteStorage for LocalFs {
})?,
);
io::copy(&mut from, &mut destination)
io::copy(&mut from.take(from_size_kb as u64), &mut destination)
.await
.with_context(|| {
format!(
@@ -509,13 +510,13 @@ mod fs_tests {
let repo_harness = RepoHarness::create("upload_file")?;
let storage = create_storage()?;
let source = create_file_for_upload(
let (file, size) = create_file_for_upload(
&storage.pageserver_workdir.join("whatever"),
"whatever_contents",
)
.await?;
let target_path = PathBuf::from("/").join("somewhere").join("else");
match storage.upload(source, &target_path, None).await {
match storage.upload(file, size, &target_path, None).await {
Ok(()) => panic!("Should not allow storing files with wrong target path"),
Err(e) => {
let message = format!("{:?}", e);
@@ -800,24 +801,17 @@ mod fs_tests {
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let relative_timeline_path = timeline_path.strip_prefix(&harness.conf.workdir)?;
let storage_path = storage.root.join(relative_timeline_path).join(name);
storage
.upload(
create_file_for_upload(
&storage.pageserver_workdir.join(name),
&dummy_contents(name),
)
.await?,
&storage_path,
metadata,
)
.await?;
let from_path = storage.pageserver_workdir.join(name);
let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
storage.upload(file, size, &storage_path, metadata).await?;
Ok(storage_path)
}
async fn create_file_for_upload(
path: &Path,
contents: &str,
) -> anyhow::Result<io::BufReader<fs::File>> {
) -> anyhow::Result<(io::BufReader<fs::File>, usize)> {
std::fs::create_dir_all(path.parent().unwrap())?;
let mut file_for_writing = std::fs::OpenOptions::new()
.write(true)
@@ -825,8 +819,10 @@ mod fs_tests {
.open(path)?;
write!(file_for_writing, "{}", contents)?;
drop(file_for_writing);
Ok(io::BufReader::new(
fs::OpenOptions::new().read(true).open(&path).await?,
let file_size = path.metadata()?.len() as usize;
Ok((
io::BufReader::new(fs::OpenOptions::new().read(true).open(&path).await?),
file_size,
))
}

View File

@@ -180,12 +180,16 @@ impl RemoteStorage for S3Bucket {
async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_kb: usize,
to: &Self::StoragePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
self.client
.put_object(PutObjectRequest {
body: Some(StreamingBody::new(ReaderStream::new(from))),
body: Some(StreamingBody::new_with_size(
ReaderStream::new(from),
from_size_kb,
)),
bucket: self.bucket_name.clone(),
key: to.key().to_owned(),
metadata: metadata.map(|m| m.0),