From 211970f0e0ce1f1a85b165309efdb8121bc0f1ce Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 9 Oct 2024 10:29:06 +0100 Subject: [PATCH] remote_storage: add `DownloadOpts::byte_(start|end)` (#9293) `download_byte_range()` is basically a copy of `download()` with an additional option passed to the backend SDKs. This can cause these code paths to diverge, and prevents combining various options. This patch adds `DownloadOpts::byte_(start|end)` and move byte range handling into `download()`. --- libs/remote_storage/src/azure_blob.rs | 26 +-- libs/remote_storage/src/lib.rs | 180 ++++++++++++------ libs/remote_storage/src/local_fs.rs | 167 ++++++---------- libs/remote_storage/src/s3_bucket.rs | 29 +-- libs/remote_storage/src/simulate_failures.rs | 19 +- libs/remote_storage/tests/common/tests.rs | 49 ++++- pageserver/src/tenant/secondary/downloader.rs | 1 + safekeeper/src/wal_backup.rs | 10 +- 8 files changed, 240 insertions(+), 241 deletions(-) diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index e113a987a5..f98d16789c 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -496,26 +496,12 @@ impl RemoteStorage for AzureBlobStorage { builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string())) } - self.download_for_builder(builder, cancel).await - } - - async fn download_byte_range( - &self, - from: &RemotePath, - start_inclusive: u64, - end_exclusive: Option, - cancel: &CancellationToken, - ) -> Result { - let blob_client = self.client.blob_client(self.relative_path_to_name(from)); - - let mut builder = blob_client.get(); - - let range: Range = if let Some(end_exclusive) = end_exclusive { - (start_inclusive..end_exclusive).into() - } else { - (start_inclusive..).into() - }; - builder = builder.range(range); + if let Some((start, end)) = opts.byte_range() { + builder = builder.range(match end { + Some(end) => Range::Range(start..end), + None => Range::RangeFrom(start..), + }); + } self.download_for_builder(builder, cancel).await } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 0ff0f1c878..c6466237bf 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -19,7 +19,8 @@ mod simulate_failures; mod support; use std::{ - collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime, + collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc, + time::SystemTime, }; use anyhow::Context; @@ -162,11 +163,60 @@ pub struct Listing { } /// Options for downloads. The default value is a plain GET. -#[derive(Default)] pub struct DownloadOpts { /// If given, returns [`DownloadError::Unmodified`] if the object still has /// the same ETag (using If-None-Match). pub etag: Option, + /// The start of the byte range to download, or unbounded. + pub byte_start: Bound, + /// The end of the byte range to download, or unbounded. Must be after the + /// start bound. + pub byte_end: Bound, +} + +impl Default for DownloadOpts { + fn default() -> Self { + Self { + etag: Default::default(), + byte_start: Bound::Unbounded, + byte_end: Bound::Unbounded, + } + } +} + +impl DownloadOpts { + /// Returns the byte range with inclusive start and exclusive end, or None + /// if unbounded. + pub fn byte_range(&self) -> Option<(u64, Option)> { + if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded { + return None; + } + let start = match self.byte_start { + Bound::Excluded(i) => i + 1, + Bound::Included(i) => i, + Bound::Unbounded => 0, + }; + let end = match self.byte_end { + Bound::Excluded(i) => Some(i), + Bound::Included(i) => Some(i + 1), + Bound::Unbounded => None, + }; + if let Some(end) = end { + assert!(start < end, "range end {end} at or before start {start}"); + } + Some((start, end)) + } + + /// Returns the byte range as an RFC 2616 Range header value with inclusive + /// bounds, or None if unbounded. + pub fn byte_range_header(&self) -> Option { + self.byte_range() + .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive + .map(|(start, end)| match end { + Some(end) => format!("bytes={start}-{end}"), + None => format!("bytes={start}-"), + }) + } } /// Storage (potentially remote) API to manage its state. @@ -257,21 +307,6 @@ pub trait RemoteStorage: Send + Sync + 'static { cancel: &CancellationToken, ) -> Result; - /// Streams a given byte range of the remote storage entry contents. - /// - /// The returned download stream will obey initial timeout and cancellation signal by erroring - /// on whichever happens first. Only one of the reasons will fail the stream, which is usually - /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out. - /// - /// Returns the metadata, if any was stored with the file previously. - async fn download_byte_range( - &self, - from: &RemotePath, - start_inclusive: u64, - end_exclusive: Option, - cancel: &CancellationToken, - ) -> Result; - /// Delete a single path from remote storage. /// /// If the operation fails because of timeout or cancellation, the root cause of the error will be @@ -425,33 +460,6 @@ impl GenericRemoteStorage> { } } - pub async fn download_byte_range( - &self, - from: &RemotePath, - start_inclusive: u64, - end_exclusive: Option, - cancel: &CancellationToken, - ) -> Result { - match self { - Self::LocalFs(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive, cancel) - .await - } - Self::AwsS3(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive, cancel) - .await - } - Self::AzureBlob(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive, cancel) - .await - } - Self::Unreliable(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive, cancel) - .await - } - } - } - /// See [`RemoteStorage::delete`] pub async fn delete( &self, @@ -573,20 +581,6 @@ impl GenericRemoteStorage { }) } - /// Downloads the storage object into the `to_path` provided. - /// `byte_range` could be specified to dowload only a part of the file, if needed. - pub async fn download_storage_object( - &self, - byte_range: Option<(u64, Option)>, - from: &RemotePath, - cancel: &CancellationToken, - ) -> Result { - match byte_range { - Some((start, end)) => self.download_byte_range(from, start, end, cancel).await, - None => self.download(from, &DownloadOpts::default(), cancel).await, - } - } - /// The name of the bucket/container/etc. pub fn bucket_name(&self) -> Option<&str> { match self { @@ -660,6 +654,76 @@ impl ConcurrencyLimiter { mod tests { use super::*; + /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges + /// with optional end bound, or None when unbounded. + #[test] + fn download_opts_byte_range() { + // Consider using test_case or a similar table-driven test framework. + let cases = [ + // (byte_start, byte_end, expected) + (Bound::Unbounded, Bound::Unbounded, None), + (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))), + (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))), + (Bound::Included(3), Bound::Unbounded, Some((3, None))), + (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))), + (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))), + (Bound::Excluded(3), Bound::Unbounded, Some((4, None))), + (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))), + (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))), + // 1-sized ranges are fine, 0 aren't and will panic (separate test). + (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))), + (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))), + ]; + + for (byte_start, byte_end, expect) in cases { + let opts = DownloadOpts { + byte_start, + byte_end, + ..Default::default() + }; + let result = opts.byte_range(); + assert_eq!( + result, expect, + "byte_start={byte_start:?} byte_end={byte_end:?}" + ); + + // Check generated HTTP header, which uses an inclusive range. + let expect_header = expect.map(|(start, end)| match end { + Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end + None => format!("bytes={start}-"), + }); + assert_eq!( + opts.byte_range_header(), + expect_header, + "byte_start={byte_start:?} byte_end={byte_end:?}" + ); + } + } + + /// DownloadOpts::byte_range() zero-sized byte range should panic. + #[test] + #[should_panic] + fn download_opts_byte_range_zero() { + DownloadOpts { + byte_start: Bound::Included(3), + byte_end: Bound::Excluded(3), + ..Default::default() + } + .byte_range(); + } + + /// DownloadOpts::byte_range() negative byte range should panic. + #[test] + #[should_panic] + fn download_opts_byte_range_negative() { + DownloadOpts { + byte_start: Bound::Included(3), + byte_end: Bound::Included(2), + ..Default::default() + } + .byte_range(); + } + #[test] fn test_object_name() { let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap(); diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index d912b94c74..93a052139b 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -506,54 +506,7 @@ impl RemoteStorage for LocalFs { return Err(DownloadError::Unmodified); } - let source = ReaderStream::new( - fs::OpenOptions::new() - .read(true) - .open(&target_path) - .await - .with_context(|| { - format!("Failed to open source file {target_path:?} to use in the download") - }) - .map_err(DownloadError::Other)?, - ); - - let metadata = self - .read_storage_metadata(&target_path) - .await - .map_err(DownloadError::Other)?; - - let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); - let source = crate::support::DownloadStream::new(cancel_or_timeout, source); - - Ok(Download { - metadata, - last_modified: file_metadata - .modified() - .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?, - etag, - download_stream: Box::pin(source), - }) - } - - async fn download_byte_range( - &self, - from: &RemotePath, - start_inclusive: u64, - end_exclusive: Option, - cancel: &CancellationToken, - ) -> Result { - if let Some(end_exclusive) = end_exclusive { - if end_exclusive <= start_inclusive { - return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})"))); - }; - if start_inclusive == end_exclusive.saturating_sub(1) { - return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes"))); - } - } - - let target_path = from.with_base(&self.storage_root); - let file_metadata = file_metadata(&target_path).await?; - let mut source = tokio::fs::OpenOptions::new() + let mut file = fs::OpenOptions::new() .read(true) .open(&target_path) .await @@ -562,31 +515,29 @@ impl RemoteStorage for LocalFs { }) .map_err(DownloadError::Other)?; - let len = source - .metadata() - .await - .context("query file length") - .map_err(DownloadError::Other)? - .len(); + let mut take = file_metadata.len(); + if let Some((start, end)) = opts.byte_range() { + if start > 0 { + file.seek(io::SeekFrom::Start(start)) + .await + .context("Failed to seek to the range start in a local storage file") + .map_err(DownloadError::Other)?; + } + if let Some(end) = end { + take = end - start; + } + } - source - .seek(io::SeekFrom::Start(start_inclusive)) - .await - .context("Failed to seek to the range start in a local storage file") - .map_err(DownloadError::Other)?; + let source = ReaderStream::new(file.take(take)); let metadata = self .read_storage_metadata(&target_path) .await .map_err(DownloadError::Other)?; - let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive); - let source = ReaderStream::new(source); - let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); let source = crate::support::DownloadStream::new(cancel_or_timeout, source); - let etag = mock_etag(&file_metadata); Ok(Download { metadata, last_modified: file_metadata @@ -688,7 +639,7 @@ mod fs_tests { use super::*; use camino_tempfile::tempdir; - use std::{collections::HashMap, io::Write}; + use std::{collections::HashMap, io::Write, ops::Bound}; async fn read_and_check_metadata( storage: &LocalFs, @@ -804,10 +755,12 @@ mod fs_tests { let (first_part_local, second_part_local) = uploaded_bytes.split_at(3); let first_part_download = storage - .download_byte_range( + .download( &upload_target, - 0, - Some(first_part_local.len() as u64), + &DownloadOpts { + byte_end: Bound::Excluded(first_part_local.len() as u64), + ..Default::default() + }, &cancel, ) .await?; @@ -823,10 +776,15 @@ mod fs_tests { ); let second_part_download = storage - .download_byte_range( + .download( &upload_target, - first_part_local.len() as u64, - Some((first_part_local.len() + second_part_local.len()) as u64), + &DownloadOpts { + byte_start: Bound::Included(first_part_local.len() as u64), + byte_end: Bound::Excluded( + (first_part_local.len() + second_part_local.len()) as u64, + ), + ..Default::default() + }, &cancel, ) .await?; @@ -842,7 +800,14 @@ mod fs_tests { ); let suffix_bytes = storage - .download_byte_range(&upload_target, 13, None, &cancel) + .download( + &upload_target, + &DownloadOpts { + byte_start: Bound::Included(13), + ..Default::default() + }, + &cancel, + ) .await? .download_stream; let suffix_bytes = aggregate(suffix_bytes).await?; @@ -850,7 +815,7 @@ mod fs_tests { assert_eq!(upload_name, suffix); let all_bytes = storage - .download_byte_range(&upload_target, 0, None, &cancel) + .download(&upload_target, &DownloadOpts::default(), &cancel) .await? .download_stream; let all_bytes = aggregate(all_bytes).await?; @@ -861,48 +826,26 @@ mod fs_tests { } #[tokio::test] - async fn download_file_range_negative() -> anyhow::Result<()> { - let (storage, cancel) = create_storage()?; + #[should_panic(expected = "at or before start")] + async fn download_file_range_negative() { + let (storage, cancel) = create_storage().unwrap(); let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel) + .await + .unwrap(); - let start = 1_000_000_000; - let end = start + 1; - match storage - .download_byte_range( + storage + .download( &upload_target, - start, - Some(end), // exclusive end + &DownloadOpts { + byte_start: Bound::Included(10), + byte_end: Bound::Excluded(10), + ..Default::default() + }, &cancel, ) .await - { - Ok(_) => panic!("Should not allow downloading wrong ranges"), - Err(e) => { - let error_string = e.to_string(); - assert!(error_string.contains("zero bytes")); - assert!(error_string.contains(&start.to_string())); - assert!(error_string.contains(&end.to_string())); - } - } - - let start = 10000; - let end = 234; - assert!(start > end, "Should test an incorrect range"); - match storage - .download_byte_range(&upload_target, start, Some(end), &cancel) - .await - { - Ok(_) => panic!("Should not allow downloading wrong ranges"), - Err(e) => { - let error_string = e.to_string(); - assert!(error_string.contains("Invalid range")); - assert!(error_string.contains(&start.to_string())); - assert!(error_string.contains(&end.to_string())); - } - } - - Ok(()) + .unwrap(); } #[tokio::test] @@ -945,10 +888,12 @@ mod fs_tests { let (first_part_local, _) = uploaded_bytes.split_at(3); let partial_download_with_metadata = storage - .download_byte_range( + .download( &upload_target, - 0, - Some(first_part_local.len() as u64), + &DownloadOpts { + byte_end: Bound::Excluded(first_part_local.len() as u64), + ..Default::default() + }, &cancel, ) .await?; diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index ec7c047565..f950f2886c 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -804,34 +804,7 @@ impl RemoteStorage for S3Bucket { bucket: self.bucket_name.clone(), key: self.relative_path_to_s3_object(from), etag: opts.etag.as_ref().map(|e| e.to_string()), - range: None, - }, - cancel, - ) - .await - } - - async fn download_byte_range( - &self, - from: &RemotePath, - start_inclusive: u64, - end_exclusive: Option, - cancel: &CancellationToken, - ) -> Result { - // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 - // and needs both ends to be exclusive - let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1)); - let range = Some(match end_inclusive { - Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"), - None => format!("bytes={start_inclusive}-"), - }); - - self.download_object( - GetObjectRequest { - bucket: self.bucket_name.clone(), - key: self.relative_path_to_s3_object(from), - etag: None, - range, + range: opts.byte_range_header(), }, cancel, ) diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 05f82b5a5a..10db53971c 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -170,28 +170,13 @@ impl RemoteStorage for UnreliableWrapper { opts: &DownloadOpts, cancel: &CancellationToken, ) -> Result { + // Note: We treat any byte range as an "attempt" of the same operation. + // We don't pay attention to the ranges. That's good enough for now. self.attempt(RemoteOp::Download(from.clone())) .map_err(DownloadError::Other)?; self.inner.download(from, opts, cancel).await } - async fn download_byte_range( - &self, - from: &RemotePath, - start_inclusive: u64, - end_exclusive: Option, - cancel: &CancellationToken, - ) -> Result { - // Note: We treat any download_byte_range as an "attempt" of the same - // operation. We don't pay attention to the ranges. That's good enough - // for now. - self.attempt(RemoteOp::Download(from.clone())) - .map_err(DownloadError::Other)?; - self.inner - .download_byte_range(from, start_inclusive, end_exclusive, cancel) - .await - } - async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { self.delete_inner(path, true, cancel).await } diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs index e38cfb3ef0..e6f33fc3f8 100644 --- a/libs/remote_storage/tests/common/tests.rs +++ b/libs/remote_storage/tests/common/tests.rs @@ -2,6 +2,7 @@ use anyhow::Context; use camino::Utf8Path; use futures::StreamExt; use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath}; +use std::ops::Bound; use std::sync::Arc; use std::{collections::HashSet, num::NonZeroU32}; use test_context::test_context; @@ -293,7 +294,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result< // Full range (end specified) let dl = ctx .client - .download_byte_range(&path, 0, Some(len as u64), &cancel) + .download( + &path, + &DownloadOpts { + byte_start: Bound::Included(0), + byte_end: Bound::Excluded(len as u64), + ..Default::default() + }, + &cancel, + ) .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); @@ -301,7 +310,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result< // partial range (end specified) let dl = ctx .client - .download_byte_range(&path, 4, Some(10), &cancel) + .download( + &path, + &DownloadOpts { + byte_start: Bound::Included(4), + byte_end: Bound::Excluded(10), + ..Default::default() + }, + &cancel, + ) .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[4..10]); @@ -309,7 +326,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result< // partial range (end beyond real end) let dl = ctx .client - .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel) + .download( + &path, + &DownloadOpts { + byte_start: Bound::Included(8), + byte_end: Bound::Excluded(len as u64 * 100), + ..Default::default() + }, + &cancel, + ) .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[8..]); @@ -317,7 +342,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result< // Partial range (end unspecified) let dl = ctx .client - .download_byte_range(&path, 4, None, &cancel) + .download( + &path, + &DownloadOpts { + byte_start: Bound::Included(4), + ..Default::default() + }, + &cancel, + ) .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[4..]); @@ -325,7 +357,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result< // Full range (end unspecified) let dl = ctx .client - .download_byte_range(&path, 0, None, &cancel) + .download( + &path, + &DownloadOpts { + byte_start: Bound::Included(0), + ..Default::default() + }, + &cancel, + ) .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 9f7447a9ac..82c5702686 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -950,6 +950,7 @@ impl<'a> TenantDownloader<'a> { let cancel = &self.secondary_state.cancel; let opts = DownloadOpts { etag: prev_etag.cloned(), + ..Default::default() }; backoff::retry( diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index ef26ac99c5..6c87e5a926 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -17,7 +17,9 @@ use std::time::Duration; use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr; use postgres_ffi::XLogFileName; use postgres_ffi::{XLogSegNo, PG_TLI}; -use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata}; +use remote_storage::{ + DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata, +}; use tokio::fs::File; use tokio::select; @@ -503,8 +505,12 @@ pub async fn read_object( let cancel = CancellationToken::new(); + let opts = DownloadOpts { + byte_start: std::ops::Bound::Included(offset), + ..Default::default() + }; let download = storage - .download_storage_object(Some((offset, None)), file_path, &cancel) + .download(file_path, &opts, &cancel) .await .with_context(|| { format!("Failed to open WAL segment download stream for remote path {file_path:?}")