From 50821c0a3cf1328e44c3023ae0ea17d835235c6a Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 4 Jul 2022 17:53:14 +0300 Subject: [PATCH] Return download stream directly from the remote storage API --- libs/remote_storage/src/lib.rs | 51 ++++- libs/remote_storage/src/local_fs.rs | 291 ++++++++++++------------ libs/remote_storage/src/s3_bucket.rs | 117 +++++----- pageserver/src/storage_sync/download.rs | 34 ++- safekeeper/src/wal_backup.rs | 78 +++---- safekeeper/src/wal_storage.rs | 3 +- 6 files changed, 298 insertions(+), 276 deletions(-) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 0889cb720c..6d47d070c1 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -12,8 +12,10 @@ use std::{ borrow::Cow, collections::HashMap, ffi::OsStr, + fmt::Debug, num::{NonZeroU32, NonZeroUsize}, path::{Path, PathBuf}, + pin::Pin, }; use anyhow::{bail, Context}; @@ -70,11 +72,7 @@ pub trait RemoteStorage: Send + Sync { /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer. /// Returns the metadata, if any was stored with the file previously. - async fn download( - &self, - from: &Self::RemoteObjectId, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result>; + async fn download(&self, from: &Self::RemoteObjectId) -> Result; /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. /// Returns the metadata, if any was stored with the file previously. @@ -83,12 +81,49 @@ pub trait RemoteStorage: Send + Sync { from: &Self::RemoteObjectId, start_inclusive: u64, end_exclusive: Option, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result>; + ) -> Result; async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>; } +pub struct Download { + pub download_stream: Pin>, + /// Extra key-value data, associated with the current remote file. + pub metadata: Option, +} + +impl Debug for Download { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Download") + .field("metadata", &self.metadata) + .finish() + } +} + +#[derive(Debug)] +pub enum DownloadError { + /// Validation or other error happened due to user input. + BadInput(anyhow::Error), + /// The file was not found in the remote storage. + NotFound, + /// The file was found in the remote storage, but the download failed. + Other(anyhow::Error), +} + +impl std::fmt::Display for DownloadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DownloadError::BadInput(e) => { + write!(f, "Failed to download a remote file due to user input: {e}") + } + DownloadError::NotFound => write!(f, "No file found for the remote object id given"), + DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"), + } + } +} + +impl std::error::Error for DownloadError {} + /// Every storage, currently supported. /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics. pub enum GenericRemoteStorage { @@ -180,7 +215,7 @@ pub struct S3Config { pub concurrency_limit: NonZeroUsize, } -impl std::fmt::Debug for S3Config { +impl Debug for S3Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("S3Config") .field("bucket_name", &self.bucket_name) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 50243352ee..25235200b2 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -17,7 +17,7 @@ use tokio::{ }; use tracing::*; -use crate::path_with_suffix_extension; +use crate::{path_with_suffix_extension, Download, DownloadError}; use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; @@ -192,15 +192,12 @@ impl RemoteStorage for LocalFs { Ok(()) } - async fn download( - &self, - from: &Self::RemoteObjectId, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result> { - let file_path = self.resolve_in_storage(from)?; - - if file_path.exists() && file_path.is_file() { - let mut source = io::BufReader::new( + async fn download(&self, from: &Self::RemoteObjectId) -> Result { + let file_path = self + .resolve_in_storage(from) + .map_err(DownloadError::BadInput)?; + if file_exists(&file_path).map_err(DownloadError::BadInput)? { + let source = io::BufReader::new( fs::OpenOptions::new() .read(true) .open(&file_path) @@ -210,22 +207,20 @@ impl RemoteStorage for LocalFs { "Failed to open source file '{}' to use in the download", file_path.display() ) - })?, + }) + .map_err(DownloadError::Other)?, ); - io::copy(&mut source, to).await.with_context(|| { - format!( - "Failed to download file '{}' from the local storage", - file_path.display() - ) - })?; - source.flush().await?; - self.read_storage_metadata(&file_path).await + let metadata = self + .read_storage_metadata(&file_path) + .await + .map_err(DownloadError::Other)?; + Ok(Download { + metadata, + download_stream: Box::pin(source), + }) } else { - bail!( - "File '{}' either does not exist or is not a file", - file_path.display() - ) + Err(DownloadError::NotFound) } } @@ -234,22 +229,19 @@ impl RemoteStorage for LocalFs { from: &Self::RemoteObjectId, start_inclusive: u64, end_exclusive: Option, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result> { + ) -> Result { if let Some(end_exclusive) = end_exclusive { - ensure!( - end_exclusive > start_inclusive, - "Invalid range, start ({}) is bigger then end ({:?})", - start_inclusive, - end_exclusive - ); + if end_exclusive <= start_inclusive { + return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})"))); + }; if start_inclusive == end_exclusive.saturating_sub(1) { - return Ok(None); + return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes"))); } } - let file_path = self.resolve_in_storage(from)?; - - if file_path.exists() && file_path.is_file() { + let file_path = self + .resolve_in_storage(from) + .map_err(DownloadError::BadInput)?; + if file_exists(&file_path).map_err(DownloadError::BadInput)? { let mut source = io::BufReader::new( fs::OpenOptions::new() .read(true) @@ -260,31 +252,31 @@ impl RemoteStorage for LocalFs { "Failed to open source file '{}' to use in the download", file_path.display() ) - })?, + }) + .map_err(DownloadError::Other)?, ); source .seek(io::SeekFrom::Start(start_inclusive)) .await - .context("Failed to seek to the range start in a local storage file")?; - match end_exclusive { - Some(end_exclusive) => { - io::copy(&mut source.take(end_exclusive - start_inclusive), to).await - } - None => io::copy(&mut source, to).await, - } - .with_context(|| { - format!( - "Failed to download file '{}' range from the local storage", - file_path.display() - ) - })?; + .context("Failed to seek to the range start in a local storage file") + .map_err(DownloadError::Other)?; + let metadata = self + .read_storage_metadata(&file_path) + .await + .map_err(DownloadError::Other)?; - self.read_storage_metadata(&file_path).await + Ok(match end_exclusive { + Some(end_exclusive) => Download { + metadata, + download_stream: Box::pin(source.take(end_exclusive - start_inclusive)), + }, + None => Download { + metadata, + download_stream: Box::pin(source), + }, + }) } else { - bail!( - "File '{}' either does not exist or is not a file", - file_path.display() - ) + Err(DownloadError::NotFound) } } @@ -352,6 +344,19 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> Ok(()) } +fn file_exists(file_path: &Path) -> anyhow::Result { + if file_path.exists() { + ensure!( + file_path.is_file(), + "file path '{}' is not a file", + file_path.display() + ); + Ok(true) + } else { + Ok(false) + } +} + #[cfg(test)] mod pure_tests { use tempfile::tempdir; @@ -518,6 +523,31 @@ mod fs_tests { use std::{collections::HashMap, io::Write}; use tempfile::tempdir; + async fn read_and_assert_remote_file_contents( + storage: &LocalFs, + #[allow(clippy::ptr_arg)] + // have to use &PathBuf due to `storage.local_path` parameter requirements + remote_storage_path: &PathBuf, + expected_metadata: Option<&StorageMetadata>, + ) -> anyhow::Result { + let mut download = storage + .download(remote_storage_path) + .await + .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?; + ensure!( + download.metadata.as_ref() == expected_metadata, + "Unexpected metadata returned for the downloaded file" + ); + + let mut contents = String::new(); + download + .download_stream + .read_to_string(&mut contents) + .await + .context("Failed to read remote file contents into string")?; + Ok(contents) + } + #[tokio::test] async fn upload_file() -> anyhow::Result<()> { let workdir = tempdir()?.path().to_owned(); @@ -568,15 +598,7 @@ mod fs_tests { let upload_name = "upload_1"; let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; - let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - let metadata = storage.download(&upload_target, &mut content_bytes).await?; - assert!( - metadata.is_none(), - "No metadata should be returned for no metadata upload" - ); - - content_bytes.flush().await?; - let contents = String::from_utf8(content_bytes.into_inner().into_inner())?; + let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?; assert_eq!( dummy_contents(upload_name), contents, @@ -584,13 +606,9 @@ mod fs_tests { ); let non_existing_path = PathBuf::from("somewhere").join("else"); - match storage.download(&non_existing_path, &mut io::sink()).await { - Ok(_) => panic!("Should not allow downloading non-existing storage files"), - Err(e) => { - let error_string = e.to_string(); - assert!(error_string.contains("does not exist")); - assert!(error_string.contains(&non_existing_path.display().to_string())); - } + match storage.download(&non_existing_path).await { + Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys + other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"), } Ok(()) } @@ -603,58 +621,31 @@ mod fs_tests { let upload_name = "upload_1"; let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; - let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - let metadata = storage - .download_byte_range(&upload_target, 0, None, &mut full_range_bytes) - .await?; - assert!( - metadata.is_none(), - "No metadata should be returned for no metadata upload" - ); - full_range_bytes.flush().await?; + let full_range_download_contents = + read_and_assert_remote_file_contents(&storage, &upload_target, None).await?; assert_eq!( dummy_contents(upload_name), - String::from_utf8(full_range_bytes.into_inner().into_inner())?, + full_range_download_contents, "Download full range should return the whole upload" ); - let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - let same_byte = 1_000_000_000; - let metadata = storage - .download_byte_range( - &upload_target, - same_byte, - Some(same_byte + 1), // exclusive end - &mut zero_range_bytes, - ) - .await?; - assert!( - metadata.is_none(), - "No metadata should be returned for no metadata upload" - ); - zero_range_bytes.flush().await?; - assert!( - zero_range_bytes.into_inner().into_inner().is_empty(), - "Zero byte range should not download any part of the file" - ); - let uploaded_bytes = dummy_contents(upload_name).into_bytes(); let (first_part_local, second_part_local) = uploaded_bytes.split_at(3); - let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - let metadata = storage - .download_byte_range( - &upload_target, - 0, - Some(first_part_local.len() as u64), - &mut first_part_remote, - ) + let mut first_part_download = storage + .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64)) .await?; assert!( - metadata.is_none(), + first_part_download.metadata.is_none(), "No metadata should be returned for no metadata upload" ); + let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + io::copy( + &mut first_part_download.download_stream, + &mut first_part_remote, + ) + .await?; first_part_remote.flush().await?; let first_part_remote = first_part_remote.into_inner().into_inner(); assert_eq!( @@ -663,20 +654,24 @@ mod fs_tests { "First part bytes should be returned when requested" ); - let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - let metadata = storage + let mut second_part_download = storage .download_byte_range( &upload_target, first_part_local.len() as u64, Some((first_part_local.len() + second_part_local.len()) as u64), - &mut second_part_remote, ) .await?; assert!( - metadata.is_none(), + second_part_download.metadata.is_none(), "No metadata should be returned for no metadata upload" ); + let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + io::copy( + &mut second_part_download.download_stream, + &mut second_part_remote, + ) + .await?; second_part_remote.flush().await?; let second_part_remote = second_part_remote.into_inner().into_inner(); assert_eq!( @@ -696,11 +691,30 @@ mod fs_tests { let upload_name = "upload_1"; let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; + let start = 1_000_000_000; + let end = start + 1; + match storage + .download_byte_range( + &upload_target, + start, + Some(end), // exclusive end + ) + .await + { + Ok(_) => panic!("Should not allow downloading wrong ranges"), + Err(e) => { + let error_string = e.to_string(); + assert!(error_string.contains("zero bytes")); + assert!(error_string.contains(&start.to_string())); + assert!(error_string.contains(&end.to_string())); + } + } + let start = 10000; let end = 234; assert!(start > end, "Should test an incorrect range"); match storage - .download_byte_range(&upload_target, start, Some(end), &mut io::sink()) + .download_byte_range(&upload_target, start, Some(end)) .await { Ok(_) => panic!("Should not allow downloading wrong ranges"), @@ -712,18 +726,6 @@ mod fs_tests { } } - let non_existing_path = PathBuf::from("somewhere").join("else"); - match storage - .download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink()) - .await - { - Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"), - Err(e) => { - let error_string = e.to_string(); - assert!(error_string.contains("does not exist")); - assert!(error_string.contains(&non_existing_path.display().to_string())); - } - } Ok(()) } @@ -762,35 +764,26 @@ mod fs_tests { let upload_target = upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?; - let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?; - - content_bytes.flush().await?; - let contents = String::from_utf8(content_bytes.into_inner().into_inner())?; + let full_range_download_contents = + read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?; assert_eq!( dummy_contents(upload_name), - contents, + full_range_download_contents, "We should upload and download the same contents" ); - assert_eq!( - full_download_metadata.as_ref(), - Some(&metadata), - "We should get the same metadata back for full download" - ); - let uploaded_bytes = dummy_contents(upload_name).into_bytes(); let (first_part_local, _) = uploaded_bytes.split_at(3); - let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - let partial_download_metadata = storage - .download_byte_range( - &upload_target, - 0, - Some(first_part_local.len() as u64), - &mut first_part_remote, - ) + let mut partial_download_with_metadata = storage + .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64)) .await?; + let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + io::copy( + &mut partial_download_with_metadata.download_stream, + &mut first_part_remote, + ) + .await?; first_part_remote.flush().await?; let first_part_remote = first_part_remote.into_inner().into_inner(); assert_eq!( @@ -800,8 +793,8 @@ mod fs_tests { ); assert_eq!( - partial_download_metadata.as_ref(), - Some(&metadata), + partial_download_with_metadata.metadata, + Some(metadata), "We should get the same metadata back for partial download" ); @@ -843,7 +836,7 @@ mod fs_tests { } fn dummy_contents(name: &str) -> String { - format!("contents for {}", name) + format!("contents for {name}") } async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result> { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 80d6966494..5269d63d09 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -9,17 +9,17 @@ use std::path::{Path, PathBuf}; use anyhow::Context; use rusoto_core::{ credential::{InstanceMetadataProvider, StaticProvider}, - HttpClient, Region, + HttpClient, Region, RusotoError, }; use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, - StreamingBody, S3, + DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, + S3Client, StreamingBody, S3, }; use tokio::{io, sync::Semaphore}; use tokio_util::io::ReaderStream; use tracing::debug; -use crate::{strip_path_prefix, RemoteStorage, S3Config}; +use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config}; use super::StorageMetadata; @@ -187,6 +187,39 @@ impl S3Bucket { concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()), }) } + + async fn download_object(&self, request: GetObjectRequest) -> Result { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 download") + .map_err(DownloadError::Other)?; + + metrics::inc_get_object(); + + match self.client.get_object(request).await { + Ok(object_output) => match object_output.body { + None => { + metrics::inc_get_object_fail(); + Err(DownloadError::Other(anyhow::anyhow!( + "Got no body for the S3 object given" + ))) + } + Some(body) => Ok(Download { + metadata: object_output.metadata.map(StorageMetadata), + download_stream: Box::pin(io::BufReader::new(body.into_async_read())), + }), + }, + Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound), + Err(e) => { + metrics::inc_get_object_fail(); + Err(DownloadError::Other(anyhow::anyhow!( + "Failed to download S3 object: {e}" + ))) + } + } + } } #[async_trait::async_trait] @@ -283,38 +316,13 @@ impl RemoteStorage for S3Bucket { Ok(()) } - async fn download( - &self, - from: &Self::RemoteObjectId, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 download")?; - - metrics::inc_get_object(); - - let object_output = self - .client - .get_object(GetObjectRequest { - bucket: self.bucket_name.clone(), - key: from.key().to_owned(), - ..GetObjectRequest::default() - }) - .await - .map_err(|e| { - metrics::inc_get_object_fail(); - e - })?; - - if let Some(body) = object_output.body { - let mut from = io::BufReader::new(body.into_async_read()); - io::copy(&mut from, to).await?; - } - - Ok(object_output.metadata.map(StorageMetadata)) + async fn download(&self, from: &Self::RemoteObjectId) -> Result { + self.download_object(GetObjectRequest { + bucket: self.bucket_name.clone(), + key: from.key().to_owned(), + ..GetObjectRequest::default() + }) + .await } async fn download_byte_range( @@ -322,8 +330,7 @@ impl RemoteStorage for S3Bucket { from: &Self::RemoteObjectId, start_inclusive: u64, end_exclusive: Option, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result> { + ) -> Result { // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 // and needs both ends to be exclusive let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1)); @@ -331,34 +338,14 @@ impl RemoteStorage for S3Bucket { Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive), None => format!("bytes={}-", start_inclusive), }); - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 range download")?; - metrics::inc_get_object(); - - let object_output = self - .client - .get_object(GetObjectRequest { - bucket: self.bucket_name.clone(), - key: from.key().to_owned(), - range, - ..GetObjectRequest::default() - }) - .await - .map_err(|e| { - metrics::inc_get_object_fail(); - e - })?; - - if let Some(body) = object_output.body { - let mut from = io::BufReader::new(body.into_async_read()); - io::copy(&mut from, to).await?; - } - - Ok(object_output.metadata.map(StorageMetadata)) + self.download_object(GetObjectRequest { + bucket: self.bucket_name.clone(), + key: from.key().to_owned(), + range, + ..GetObjectRequest::default() + }) + .await } async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> { diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index 99ccf27e1c..b51826fa1e 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -44,13 +44,23 @@ where index_part_path.display() ) })?; + + let mut index_part_download = + storage + .download(&part_storage_path) + .await + .with_context(|| { + format!("Failed to open download stream for for storage path {part_storage_path:?}") + })?; let mut index_part_bytes = Vec::new(); - storage - .download(&part_storage_path, &mut index_part_bytes) - .await - .with_context(|| { - format!("Failed to download an index part from storage path {part_storage_path:?}") - })?; + io::copy( + &mut index_part_download.download_stream, + &mut index_part_bytes, + ) + .await + .with_context(|| { + format!("Failed to download an index part from storage path {part_storage_path:?}") + })?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| { format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'") @@ -162,15 +172,19 @@ where temp_file_path.display() ) })?; - - storage - .download(&layer_storage_path, &mut destination_file) + let mut download = storage + .download(&layer_storage_path) .await .with_context(|| { format!( - "Failed to download a layer from storage path '{layer_storage_path:?}'" + "Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'" ) })?; + io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| { + format!( + "Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display() + ) + })?; // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: // A file will not be closed immediately when it goes out of scope if there are any IO operations diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 8fada70e8b..b2f9d8d4f3 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -2,18 +2,16 @@ use anyhow::{Context, Result}; use etcd_broker::subscription_key::{ NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind, }; -use tokio::io::AsyncRead; use tokio::task::JoinHandle; use std::cmp::min; use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use postgres_ffi::xlog_utils::{ - XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI, -}; +use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI}; use remote_storage::{GenericRemoteStorage, RemoteStorage}; use tokio::fs::File; use tokio::runtime::Builder; @@ -452,45 +450,41 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> { pub async fn read_object( file_path: PathBuf, offset: u64, -) -> (impl AsyncRead, JoinHandle>) { - let storage = REMOTE_STORAGE.get().expect("failed to get remote storage"); +) -> anyhow::Result>> { + let download = match REMOTE_STORAGE + .get() + .context("Failed to get remote storage")? + .as_ref() + .context("No remote storage configured")? + { + GenericRemoteStorage::Local(local_storage) => { + let source = local_storage.remote_object_id(&file_path)?; - let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE); - - let copy_result = tokio::spawn(async move { - let res = match storage.as_ref().unwrap() { - GenericRemoteStorage::Local(local_storage) => { - let source = local_storage.remote_object_id(&file_path)?; - - info!( - "local download about to start from {} at offset {}", - source.display(), - offset - ); - local_storage - .download_byte_range(&source, offset, None, &mut pipe_writer) - .await - } - GenericRemoteStorage::S3(s3_storage) => { - let s3key = s3_storage.remote_object_id(&file_path)?; - - info!( - "S3 download about to start from {:?} at offset {}", - s3key, offset - ); - s3_storage - .download_byte_range(&s3key, offset, None, &mut pipe_writer) - .await - } - }; - - if let Err(e) = res { - error!("failed to download WAL segment from remote storage: {}", e); - Err(e) - } else { - Ok(()) + info!( + "local download about to start from {} at offset {}", + source.display(), + offset + ); + local_storage + .download_byte_range(&source, offset, None) + .await } - }); + GenericRemoteStorage::S3(s3_storage) => { + let s3key = s3_storage.remote_object_id(&file_path)?; - (pipe_reader, copy_result) + info!( + "S3 download about to start from {:?} at offset {}", + s3key, offset + ); + s3_storage.download_byte_range(&s3key, offset, None).await + } + } + .with_context(|| { + format!( + "Failed to open WAL segment download stream for local storage path {}", + file_path.display() + ) + })?; + + Ok(download.download_stream) } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 5cb7a8c758..9b23e2189c 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -604,8 +604,7 @@ impl WalReader { // Try to open remote file, if remote reads are enabled if self.enable_remote_read { - let (reader, _) = read_object(wal_file_path, xlogoff as u64).await; - return Ok(Box::pin(reader)); + return read_object(wal_file_path, xlogoff as u64).await; } bail!("WAL segment is not found")