diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 3756390523..21ab65a896 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -147,6 +147,15 @@ trait RemoteStorage: Send + Sync { to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), ) -> anyhow::Result<()>; + /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. + async fn download_range( + &self, + from: &Self::StoragePath, + start_inclusive: u64, + end_exclusive: Option, + to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), + ) -> anyhow::Result<()>; + async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>; } diff --git a/pageserver/src/remote_storage/local_fs.rs b/pageserver/src/remote_storage/local_fs.rs index d3c7f8218c..d7cdcacb98 100644 --- a/pageserver/src/remote_storage/local_fs.rs +++ b/pageserver/src/remote_storage/local_fs.rs @@ -11,16 +11,15 @@ use std::{ pin::Pin, }; -use anyhow::{bail, Context}; +use anyhow::{bail, ensure, Context}; use tokio::{ fs, - io::{self, AsyncWriteExt}, + io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; use tracing::*; -use crate::layered_repository::metadata::METADATA_FILE_NAME; - use super::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage}; +use crate::layered_repository::metadata::METADATA_FILE_NAME; pub struct LocalFs { pageserver_workdir: &'static Path, @@ -154,6 +153,64 @@ impl RemoteStorage for LocalFs { } } + async fn download_range( + &self, + from: &Self::StoragePath, + start_inclusive: u64, + end_exclusive: Option, + to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), + ) -> anyhow::Result<()> { + if let Some(end_exclusive) = end_exclusive { + ensure!( + end_exclusive > start_inclusive, + "Invalid range, start ({}) is bigger then end ({:?})", + start_inclusive, + end_exclusive + ); + if start_inclusive == end_exclusive.saturating_sub(1) { + return Ok(()); + } + } + let file_path = self.resolve_in_storage(from)?; + + if file_path.exists() && file_path.is_file() { + let mut source = io::BufReader::new( + fs::OpenOptions::new() + .read(true) + .open(&file_path) + .await + .with_context(|| { + format!( + "Failed to open source file '{}' to use in the download", + file_path.display() + ) + })?, + ); + source + .seek(io::SeekFrom::Start(start_inclusive)) + .await + .context("Failed to seek to the range start in a local storage file")?; + match end_exclusive { + Some(end_exclusive) => { + io::copy(&mut source.take(end_exclusive - start_inclusive), to).await + } + None => io::copy(&mut source, to).await, + } + .with_context(|| { + format!( + "Failed to download file '{}' range from the local storage", + file_path.display() + ) + })?; + Ok(()) + } else { + bail!( + "File '{}' either does not exist or is not a file", + file_path.display() + ) + } + } + async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> { let file_path = self.resolve_in_storage(path)?; if file_path.exists() && file_path.is_file() { @@ -450,20 +507,6 @@ mod fs_tests { "Should list a two different files after second upload" ); - // match storage.upload_file(&mut source, &target_path_1).await { - // Ok(()) => panic!("Should not allow reuploading storage files"), - // Err(e) => { - // let message = format!("{:?}", e); - // assert!(message.contains(&target_path_1.display().to_string())); - // assert!(message.contains("File exists")); - // } - // } - assert_eq!( - list_files_sorted(&storage).await?, - vec![target_path_1, target_path_2], - "Should list a two different files after all upload attempts" - ); - Ok(()) } @@ -503,6 +546,118 @@ mod fs_tests { Ok(()) } + #[tokio::test] + async fn download_file_range_positive() -> anyhow::Result<()> { + let repo_harness = RepoHarness::create("download_file_range_positive")?; + let storage = create_storage()?; + let upload_name = "upload_1"; + let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?; + + let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + storage + .download_range(&upload_target, 0, None, &mut full_range_bytes) + .await?; + full_range_bytes.flush().await?; + assert_eq!( + dummy_contents(upload_name), + String::from_utf8(full_range_bytes.into_inner().into_inner())?, + "Download full range should return the whole upload" + ); + + let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + let same_byte = 1_000_000_000; + storage + .download_range( + &upload_target, + same_byte, + Some(same_byte + 1), // exclusive end + &mut zero_range_bytes, + ) + .await?; + zero_range_bytes.flush().await?; + assert!( + zero_range_bytes.into_inner().into_inner().is_empty(), + "Zero byte range should not download any part of the file" + ); + + let uploaded_bytes = dummy_contents(upload_name).into_bytes(); + let (first_part_local, second_part_local) = uploaded_bytes.split_at(3); + + let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + storage + .download_range( + &upload_target, + 0, + Some(first_part_local.len() as u64), + &mut first_part_remote, + ) + .await?; + first_part_remote.flush().await?; + let first_part_remote = first_part_remote.into_inner().into_inner(); + assert_eq!( + first_part_local, + first_part_remote.as_slice(), + "First part bytes should be returned when requrested" + ); + + let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + storage + .download_range( + &upload_target, + first_part_local.len() as u64, + Some((first_part_local.len() + second_part_local.len()) as u64), + &mut second_part_remote, + ) + .await?; + second_part_remote.flush().await?; + let second_part_remote = second_part_remote.into_inner().into_inner(); + assert_eq!( + second_part_local, + second_part_remote.as_slice(), + "Second part bytes should be returned when requrested" + ); + + Ok(()) + } + + #[tokio::test] + async fn download_file_range_negative() -> anyhow::Result<()> { + let repo_harness = RepoHarness::create("download_file_range_negative")?; + let storage = create_storage()?; + let upload_name = "upload_1"; + let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?; + + let start = 10000; + let end = 234; + assert!(start > end, "Should test an incorrect range"); + match storage + .download_range(&upload_target, start, Some(end), &mut io::sink()) + .await + { + Ok(_) => panic!("Should not allow downloading wrong ranges"), + Err(e) => { + let error_string = e.to_string(); + assert!(error_string.contains("Invalid range")); + assert!(error_string.contains(&start.to_string())); + assert!(error_string.contains(&end.to_string())); + } + } + + let non_existing_path = PathBuf::from("somewhere").join("else"); + match storage + .download_range(&non_existing_path, 1, Some(3), &mut io::sink()) + .await + { + Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"), + Err(e) => { + let error_string = e.to_string(); + assert!(error_string.contains("does not exist")); + assert!(error_string.contains(&non_existing_path.display().to_string())); + } + } + Ok(()) + } + #[tokio::test] async fn delete_file() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("delete_file")?; diff --git a/pageserver/src/remote_storage/rust_s3.rs b/pageserver/src/remote_storage/rust_s3.rs index a755dc6c99..05e8617b82 100644 --- a/pageserver/src/remote_storage/rust_s3.rs +++ b/pageserver/src/remote_storage/rust_s3.rs @@ -148,17 +148,49 @@ impl RemoteStorage for S3 { .with_context(|| format!("Failed to download s3 object with key {}", from.key()))?; if code != 200 { Err(anyhow::format_err!( - "Received non-200 exit code during downloading object from directory, code: {}", + "Received non-200 exit code during downloading object, code: {}", code )) } else { - io::copy(&mut io::BufReader::new(data.as_slice()), to) + // we don't have to write vector into the destination this way, `to_write_all` would be enough. + // but we want to prepare for migration on `rusoto`, that has a streaming HTTP body instead here, with + // which it makes more sense to use `io::copy`. + io::copy(&mut data.as_slice(), to) .await .context("Failed to write downloaded data into the destination buffer")?; Ok(()) } } + async fn download_range( + &self, + from: &Self::StoragePath, + start_inclusive: u64, + end_exclusive: Option, + to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), + ) -> anyhow::Result<()> { + // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + // and needs both ends to be exclusive + let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1)); + let (data, code) = self + .bucket + .get_object_range(from.key(), start_inclusive, end_inclusive) + .await + .with_context(|| format!("Failed to download s3 object with key {}", from.key()))?; + if code != 206 { + Err(anyhow::format_err!( + "Received non-206 exit code during downloading object range, code: {}", + code + )) + } else { + // see `download` function above for the comment on why `Vec` buffer is copied this way + io::copy(&mut data.as_slice(), to) + .await + .context("Failed to write downloaded range into the destination buffer")?; + Ok(()) + } + } + async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> { let (_, code) = self .bucket