diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 548bde02f6..7ea1103eb2 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -117,6 +117,8 @@ impl AzureBlobStorage { ) -> Result { let mut response = builder.into_stream(); + let mut etag = None; + let mut last_modified = None; let mut metadata = HashMap::new(); // TODO give proper streaming response instead of buffering into RAM // https://github.com/neondatabase/neon/issues/5563 @@ -124,6 +126,13 @@ impl AzureBlobStorage { let mut bufs = Vec::new(); while let Some(part) = response.next().await { let part = part.map_err(to_download_error)?; + let etag_str: &str = part.blob.properties.etag.as_ref(); + if etag.is_none() { + etag = Some(etag.unwrap_or_else(|| etag_str.to_owned())); + } + if last_modified.is_none() { + last_modified = Some(part.blob.properties.last_modified.into()); + } if let Some(blob_meta) = part.blob.metadata { metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned()))); } @@ -136,6 +145,8 @@ impl AzureBlobStorage { } Ok(Download { download_stream: Box::pin(futures::stream::iter(bufs.into_iter().map(Ok))), + etag, + last_modified, metadata: Some(StorageMetadata(metadata)), }) } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e77c54e1e7..3e408e3119 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -14,7 +14,9 @@ mod local_fs; mod s3_bucket; mod simulate_failures; -use std::{collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc}; +use std::{ + collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc, time::SystemTime, +}; use anyhow::{bail, Context}; use camino::{Utf8Path, Utf8PathBuf}; @@ -207,8 +209,13 @@ pub trait RemoteStorage: Send + Sync + 'static { async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; } +pub type DownloadStream = Pin> + Unpin + Send + Sync>>; pub struct Download { - pub download_stream: Pin> + Unpin + Send + Sync>>, + pub download_stream: DownloadStream, + /// The last time the file was modified (`last-modified` HTTP header) + pub last_modified: Option, + /// A way to identify this specific version of the resource (`etag` HTTP header) + pub etag: Option, /// Extra key-value data, associated with the current remote file. pub metadata: Option, } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 03b98e5ea2..d1e7d325b9 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -18,7 +18,7 @@ use tokio_util::io::ReaderStream; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; -use crate::{Download, DownloadError, Listing, ListingMode, RemotePath}; +use crate::{Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath}; use super::{RemoteStorage, StorageMetadata}; @@ -331,6 +331,8 @@ impl RemoteStorage for LocalFs { .map_err(DownloadError::Other)?; Ok(Download { metadata, + last_modified: None, + etag: None, download_stream: Box::pin(source), }) } else { @@ -372,17 +374,17 @@ impl RemoteStorage for LocalFs { .await .map_err(DownloadError::Other)?; - Ok(match end_exclusive { - Some(end_exclusive) => Download { - metadata, - download_stream: Box::pin(ReaderStream::new( - source.take(end_exclusive - start_inclusive), - )), - }, - None => Download { - metadata, - download_stream: Box::pin(ReaderStream::new(source)), - }, + let download_stream: DownloadStream = match end_exclusive { + Some(end_exclusive) => Box::pin(ReaderStream::new( + source.take(end_exclusive - start_inclusive), + )), + None => Box::pin(ReaderStream::new(source)), + }; + Ok(Download { + metadata, + last_modified: None, + etag: None, + download_stream, }) } else { Err(DownloadError::NotFound) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 98be6f0637..0f95458ad1 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -231,6 +231,8 @@ impl S3Bucket { match get_object { Ok(object_output) => { let metadata = object_output.metadata().cloned().map(StorageMetadata); + let etag = object_output.e_tag.clone(); + let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok()); let body = object_output.body; let body = ByteStreamAsStream::from(body); @@ -239,6 +241,8 @@ impl S3Bucket { Ok(Download { metadata, + etag, + last_modified, download_stream: Box::pin(body), }) }