diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index d5ad2f8633..8a10e098a1 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -164,6 +164,102 @@ impl GenericRemoteStorage { _ => None, } } + + /// Takes storage object contents and its size and uploads to remote storage, + /// mapping `from_path` to the corresponding remote object id in the storage. + /// + /// The storage object does not have to be present on the `from_path`, + /// this path is used for the remote object id conversion only. + pub async fn upload_storage_object( + &self, + from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, + from_size_bytes: usize, + from_path: &Path, + ) -> anyhow::Result<()> { + async fn do_upload_storage_object( + storage: &S, + from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, + from_size_bytes: usize, + from_path: &Path, + ) -> anyhow::Result<()> + where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, + { + let target_storage_path = storage.remote_object_id(from_path).with_context(|| { + format!( + "Failed to get the storage path for source local path '{}'", + from_path.display() + ) + })?; + + storage + .upload(from, from_size_bytes, &target_storage_path, None) + .await + .with_context(|| { + format!( + "Failed to upload from '{}' to storage path '{:?}'", + from_path.display(), + target_storage_path + ) + }) + } + + match self { + GenericRemoteStorage::Local(storage) => { + do_upload_storage_object(storage, from, from_size_bytes, from_path).await + } + GenericRemoteStorage::S3(storage) => { + do_upload_storage_object(storage, from, from_size_bytes, from_path).await + } + } + } + + /// Downloads the storage object into the `to_path` provided. + /// `byte_range` could be specified to dowload only a part of the file, if needed. + pub async fn download_storage_object( + &self, + byte_range: Option<(u64, Option)>, + to_path: &Path, + ) -> Result { + async fn do_download_storage_object( + storage: &S, + byte_range: Option<(u64, Option)>, + to_path: &Path, + ) -> Result + where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, + { + let remote_object_path = storage + .remote_object_id(to_path) + .with_context(|| { + format!( + "Failed to get the storage path for target local path '{}'", + to_path.display() + ) + }) + .map_err(DownloadError::BadInput)?; + + match byte_range { + Some((start, end)) => { + storage + .download_byte_range(&remote_object_path, start, end) + .await + } + None => storage.download(&remote_object_path).await, + } + } + + match self { + GenericRemoteStorage::Local(storage) => { + do_download_storage_object(storage, byte_range, to_path).await + } + GenericRemoteStorage::S3(storage) => { + do_download_storage_object(storage, byte_range, to_path).await + } + } + } } /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry. diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index ded4c042c4..ebc9a252b7 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -10,7 +10,7 @@ use std::{ use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; use remote_storage::{ - path_with_suffix_extension, Download, DownloadError, GenericRemoteStorage, RemoteStorage, + path_with_suffix_extension, DownloadError, GenericRemoteStorage, RemoteStorage, }; use tokio::{ fs, @@ -143,7 +143,9 @@ async fn download_index_part( let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - let mut index_part_download = download_storage_object(storage, &index_part_path).await?; + let mut index_part_download = storage + .download_storage_object(None, &index_part_path) + .await?; let mut index_part_bytes = Vec::new(); io::copy( @@ -262,7 +264,7 @@ pub(super) async fn download_timeline_layers<'a>( ) })?; - let mut layer_download = download_storage_object(storage, &layer_destination_path) + let mut layer_download = storage.download_storage_object(None, &layer_destination_path) .await .with_context(|| { format!( @@ -365,37 +367,6 @@ pub(super) async fn download_timeline_layers<'a>( } } -async fn download_storage_object( - storage: &GenericRemoteStorage, - to_path: &Path, -) -> Result { - async fn do_download_storage_object( - storage: &S, - to_path: &Path, - ) -> Result - where - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, - { - let remote_object_path = storage - .remote_object_id(to_path) - .with_context(|| { - format!( - "Failed to get the storage path for target local path '{}'", - to_path.display() - ) - }) - .map_err(DownloadError::BadInput)?; - - storage.download(&remote_object_path).await - } - - match storage { - GenericRemoteStorage::Local(storage) => do_download_storage_object(storage, to_path).await, - GenericRemoteStorage::S3(storage) => do_download_storage_object(storage, to_path).await, - } -} - async fn get_timeline_sync_ids( storage: &GenericRemoteStorage, tenant_path: &Path, diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs index a8c768e0ae..7ef775e690 100644 --- a/pageserver/src/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -1,14 +1,11 @@ //! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints. -use std::{ - fmt::Debug, - path::{Path, PathBuf}, -}; +use std::{fmt::Debug, path::PathBuf}; use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; use once_cell::sync::Lazy; -use remote_storage::{GenericRemoteStorage, RemoteStorage}; +use remote_storage::GenericRemoteStorage; use tokio::fs; use tracing::{debug, error, info, warn}; @@ -47,7 +44,8 @@ pub(super) async fn upload_index_part( let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - upload_storage_object(storage, index_part_bytes, index_part_size, &index_part_path) + storage + .upload_storage_object(index_part_bytes, index_part_size, &index_part_path) .await .with_context(|| format!("Failed to upload index part for '{sync_id}'")) } @@ -131,7 +129,8 @@ pub(super) async fn upload_timeline_layers<'a>( .map_err(UploadError::Other)? .len() as usize; - match upload_storage_object(storage, source_file, source_size, &source_path) + match storage + .upload_storage_object(source_file, source_size, &source_path) .await .with_context(|| format!("Failed to upload layer file for {sync_id}")) { @@ -193,51 +192,6 @@ pub(super) async fn upload_timeline_layers<'a>( } } -async fn upload_storage_object( - storage: &GenericRemoteStorage, - from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, - from_size_bytes: usize, - from_path: &Path, -) -> anyhow::Result<()> { - async fn do_upload_storage_object( - storage: &S, - from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, - from_size_bytes: usize, - from_path: &Path, - ) -> anyhow::Result<()> - where - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, - { - let target_storage_path = storage.remote_object_id(from_path).with_context(|| { - format!( - "Failed to get the storage path for source local path '{}'", - from_path.display() - ) - })?; - - storage - .upload(from, from_size_bytes, &target_storage_path, None) - .await - .with_context(|| { - format!( - "Failed to upload from '{}' to storage path '{:?}'", - from_path.display(), - target_storage_path - ) - }) - } - - match storage { - GenericRemoteStorage::Local(storage) => { - do_upload_storage_object(storage, from, from_size_bytes, from_path).await - } - GenericRemoteStorage::S3(storage) => { - do_upload_storage_object(storage, from, from_size_bytes, from_path).await - } - } -} - enum UploadError { MissingLocalFile(PathBuf, anyhow::Error), Other(anyhow::Error), diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 3552452470..a15ba02863 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -13,7 +13,7 @@ use std::time::Duration; use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr}; use postgres_ffi::PG_TLI; -use remote_storage::{GenericRemoteStorage, RemoteStorage}; +use remote_storage::GenericRemoteStorage; use tokio::fs::File; use tokio::runtime::Builder; @@ -419,73 +419,37 @@ static REMOTE_STORAGE: OnceCell> = OnceCell::new(); async fn backup_object(source_file: &Path, size: usize) -> Result<()> { let storage = REMOTE_STORAGE.get().expect("failed to get remote storage"); - let file = File::open(&source_file).await?; + let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| { + format!( + "Failed to open file {} for wal backup", + source_file.display() + ) + })?); - // Storage is initialized by launcher at this point. - match storage.as_ref().unwrap() { - GenericRemoteStorage::Local(local_storage) => { - let destination = local_storage.remote_object_id(source_file)?; - - debug!( - "local upload about to start from {} to {}", - source_file.display(), - destination.display() - ); - local_storage.upload(file, size, &destination, None).await - } - GenericRemoteStorage::S3(s3_storage) => { - let s3key = s3_storage.remote_object_id(source_file)?; - - debug!( - "S3 upload about to start from {} to {:?}", - source_file.display(), - s3key - ); - s3_storage.upload(file, size, &s3key, None).await - } - }?; - - Ok(()) + storage + .as_ref() + .expect("Storage should be initialized by launcher at this point.") + .upload_storage_object(file, size, source_file) + .await } pub async fn read_object( file_path: PathBuf, offset: u64, ) -> anyhow::Result>> { - let download = match REMOTE_STORAGE + let download = REMOTE_STORAGE .get() .context("Failed to get remote storage")? .as_ref() .context("No remote storage configured")? - { - GenericRemoteStorage::Local(local_storage) => { - let source = local_storage.remote_object_id(&file_path)?; - - info!( - "local download about to start from {} at offset {}", - source.display(), - offset - ); - local_storage - .download_byte_range(&source, offset, None) - .await - } - GenericRemoteStorage::S3(s3_storage) => { - let s3key = s3_storage.remote_object_id(&file_path)?; - - info!( - "S3 download about to start from {:?} at offset {}", - s3key, offset - ); - s3_storage.download_byte_range(&s3key, offset, None).await - } - } - .with_context(|| { - format!( - "Failed to open WAL segment download stream for local storage path {}", - file_path.display() - ) - })?; + .download_storage_object(Some((offset, None)), &file_path) + .await + .with_context(|| { + format!( + "Failed to open WAL segment download stream for local storage path {}", + file_path.display() + ) + })?; Ok(download.download_stream) }