From 54db1f5d8a6b86ba0c7cd9d23c4e482913906829 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 16 Aug 2023 10:17:15 +0100 Subject: [PATCH] remote_storage: add a helper for downloading full objects This is only for use with small objects that we will deserialize in a non-streaming way. Also add a strip_prefix method to RemotePath. --- libs/remote_storage/src/lib.rs | 18 +++++++++++++++++- .../tenant/remote_timeline_client/download.rs | 16 +--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index a810f09787..334ca8a5b2 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -13,7 +13,7 @@ use std::{ collections::HashMap, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, - path::{Path, PathBuf}, + path::{Path, PathBuf, StripPrefixError}, pin::Pin, sync::Arc, }; @@ -108,6 +108,10 @@ impl RemotePath { pub fn extension(&self) -> Option<&str> { self.0.extension()?.to_str() } + + pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Path, StripPrefixError> { + self.0.strip_prefix(&p.0) + } } /// Storage (potentially remote) API to manage its state. @@ -261,6 +265,18 @@ impl GenericRemoteStorage { } } + /// For small, simple downloads where caller doesn't want to handle the streaming: return the full body + pub async fn download_all(&self, from: &RemotePath) -> Result, DownloadError> { + let mut download = self.download(from).await?; + + let mut bytes = Vec::new(); + tokio::io::copy(&mut download.download_stream, &mut bytes) + .await + .with_context(|| format!("Failed to download body from {from}")) + .map_err(DownloadError::Other)?; + Ok(bytes) + } + pub async fn download_byte_range( &self, from: &RemotePath, diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 7426ae10e9..831dc62f33 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -234,21 +234,7 @@ pub(super) async fn download_index_part( .map_err(DownloadError::BadInput)?; let index_part_bytes = download_retry( - || async { - let mut index_part_download = storage.download(&part_storage_path).await?; - - let mut index_part_bytes = Vec::new(); - tokio::io::copy( - &mut index_part_download.download_stream, - &mut index_part_bytes, - ) - .await - .with_context(|| { - format!("Failed to download an index part into file {index_part_path:?}") - }) - .map_err(DownloadError::Other)?; - Ok(index_part_bytes) - }, + || storage.download_all(&part_storage_path), &format!("download {part_storage_path:?}"), ) .await?;