diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index aebd74af5a..8167830347 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -324,6 +324,9 @@ trait RemoteStorage: Send + Sync { async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + /// S3 PUT request requires the content length to be specified, + /// otherwise it starts to fail with the concurrent connection count increasing. + from_size_kb: usize, to: &Self::StoragePath, metadata: Option, ) -> anyhow::Result<()>; diff --git a/pageserver/src/remote_storage/local_fs.rs b/pageserver/src/remote_storage/local_fs.rs index b40089d53c..15c69beebb 100644 --- a/pageserver/src/remote_storage/local_fs.rs +++ b/pageserver/src/remote_storage/local_fs.rs @@ -104,7 +104,8 @@ impl RemoteStorage for LocalFs { async fn upload( &self, - mut from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + from_size_kb: usize, to: &Self::StoragePath, metadata: Option, ) -> anyhow::Result<()> { @@ -128,7 +129,7 @@ impl RemoteStorage for LocalFs { })?, ); - io::copy(&mut from, &mut destination) + io::copy(&mut from.take(from_size_kb as u64), &mut destination) .await .with_context(|| { format!( @@ -509,13 +510,13 @@ mod fs_tests { let repo_harness = RepoHarness::create("upload_file")?; let storage = create_storage()?; - let source = create_file_for_upload( + let (file, size) = create_file_for_upload( &storage.pageserver_workdir.join("whatever"), "whatever_contents", ) .await?; let target_path = PathBuf::from("/").join("somewhere").join("else"); - match storage.upload(source, &target_path, None).await { + match storage.upload(file, size, &target_path, None).await { Ok(()) => panic!("Should not allow storing files with wrong target path"), Err(e) => { let message = format!("{:?}", e); @@ -800,24 +801,17 @@ mod fs_tests { let timeline_path = harness.timeline_path(&TIMELINE_ID); let relative_timeline_path = timeline_path.strip_prefix(&harness.conf.workdir)?; let storage_path = storage.root.join(relative_timeline_path).join(name); - storage - .upload( - create_file_for_upload( - &storage.pageserver_workdir.join(name), - &dummy_contents(name), - ) - .await?, - &storage_path, - metadata, - ) - .await?; + + let from_path = storage.pageserver_workdir.join(name); + let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?; + storage.upload(file, size, &storage_path, metadata).await?; Ok(storage_path) } async fn create_file_for_upload( path: &Path, contents: &str, - ) -> anyhow::Result> { + ) -> anyhow::Result<(io::BufReader, usize)> { std::fs::create_dir_all(path.parent().unwrap())?; let mut file_for_writing = std::fs::OpenOptions::new() .write(true) @@ -825,8 +819,10 @@ mod fs_tests { .open(path)?; write!(file_for_writing, "{}", contents)?; drop(file_for_writing); - Ok(io::BufReader::new( - fs::OpenOptions::new().read(true).open(&path).await?, + let file_size = path.metadata()?.len() as usize; + Ok(( + io::BufReader::new(fs::OpenOptions::new().read(true).open(&path).await?), + file_size, )) } diff --git a/pageserver/src/remote_storage/s3_bucket.rs b/pageserver/src/remote_storage/s3_bucket.rs index bfd28168f4..b99fa478c4 100644 --- a/pageserver/src/remote_storage/s3_bucket.rs +++ b/pageserver/src/remote_storage/s3_bucket.rs @@ -180,12 +180,16 @@ impl RemoteStorage for S3Bucket { async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + from_size_kb: usize, to: &Self::StoragePath, metadata: Option, ) -> anyhow::Result<()> { self.client .put_object(PutObjectRequest { - body: Some(StreamingBody::new(ReaderStream::new(from))), + body: Some(StreamingBody::new_with_size( + ReaderStream::new(from), + from_size_kb, + )), bucket: self.bucket_name.clone(), key: to.key().to_owned(), metadata: metadata.map(|m| m.0),