diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 15f3cd3b80..4a53f485ca 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -18,6 +18,7 @@ camino.workspace = true humantime.workspace = true hyper = { workspace = true, features = ["stream"] } futures.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true tokio = { workspace = true, features = ["sync", "fs", "io-util"] } diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 12ec680cb6..1e337bc1e8 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -174,6 +174,15 @@ impl AzureBlobStorage { .map_err(|e| DownloadError::Other(e.into()))?; bufs.push(data); } + + if bufs.is_empty() { + return Err(DownloadError::Other(anyhow::anyhow!( + "Azure GET response contained no buffers" + ))); + } + let etag = etag.unwrap(); + let last_modified = last_modified.unwrap(); + Ok(Download { download_stream: Box::pin(futures::stream::iter(bufs.into_iter().map(Ok))), etag, diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index b0b69f9155..fd832eb94f 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -291,9 +291,9 @@ pub type DownloadStream = pub struct Download { pub download_stream: DownloadStream, /// The last time the file was modified (`last-modified` HTTP header) - pub last_modified: Option, + pub last_modified: SystemTime, /// A way to identify this specific version of the resource (`etag` HTTP header) - pub etag: Option, + pub etag: String, /// Extra key-value data, associated with the current remote file. pub metadata: Option, } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 478ad81dc1..ea0756541b 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -10,7 +10,7 @@ use std::{ io::ErrorKind, num::NonZeroU32, pin::Pin, - time::{Duration, SystemTime}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use anyhow::{bail, ensure, Context}; @@ -406,35 +406,37 @@ impl RemoteStorage for LocalFs { cancel: &CancellationToken, ) -> Result { let target_path = from.with_base(&self.storage_root); - if file_exists(&target_path).map_err(DownloadError::BadInput)? { - let source = ReaderStream::new( - fs::OpenOptions::new() - .read(true) - .open(&target_path) - .await - .with_context(|| { - format!("Failed to open source file {target_path:?} to use in the download") - }) - .map_err(DownloadError::Other)?, - ); - let metadata = self - .read_storage_metadata(&target_path) + let file_metadata = file_metadata(&target_path).await?; + + let source = ReaderStream::new( + fs::OpenOptions::new() + .read(true) + .open(&target_path) .await - .map_err(DownloadError::Other)?; + .with_context(|| { + format!("Failed to open source file {target_path:?} to use in the download") + }) + .map_err(DownloadError::Other)?, + ); - let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); - let source = crate::support::DownloadStream::new(cancel_or_timeout, source); + let metadata = self + .read_storage_metadata(&target_path) + .await + .map_err(DownloadError::Other)?; - Ok(Download { - metadata, - last_modified: None, - etag: None, - download_stream: Box::pin(source), - }) - } else { - Err(DownloadError::NotFound) - } + let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); + let source = crate::support::DownloadStream::new(cancel_or_timeout, source); + + let etag = mock_etag(&file_metadata); + Ok(Download { + metadata, + last_modified: file_metadata + .modified() + .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?, + etag, + download_stream: Box::pin(source), + }) } async fn download_byte_range( @@ -452,50 +454,51 @@ impl RemoteStorage for LocalFs { return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes"))); } } + let target_path = from.with_base(&self.storage_root); - if file_exists(&target_path).map_err(DownloadError::BadInput)? { - let mut source = tokio::fs::OpenOptions::new() - .read(true) - .open(&target_path) - .await - .with_context(|| { - format!("Failed to open source file {target_path:?} to use in the download") - }) - .map_err(DownloadError::Other)?; - - let len = source - .metadata() - .await - .context("query file length") - .map_err(DownloadError::Other)? - .len(); - - source - .seek(io::SeekFrom::Start(start_inclusive)) - .await - .context("Failed to seek to the range start in a local storage file") - .map_err(DownloadError::Other)?; - - let metadata = self - .read_storage_metadata(&target_path) - .await - .map_err(DownloadError::Other)?; - - let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive); - let source = ReaderStream::new(source); - - let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); - let source = crate::support::DownloadStream::new(cancel_or_timeout, source); - - Ok(Download { - metadata, - last_modified: None, - etag: None, - download_stream: Box::pin(source), + let file_metadata = file_metadata(&target_path).await?; + let mut source = tokio::fs::OpenOptions::new() + .read(true) + .open(&target_path) + .await + .with_context(|| { + format!("Failed to open source file {target_path:?} to use in the download") }) - } else { - Err(DownloadError::NotFound) - } + .map_err(DownloadError::Other)?; + + let len = source + .metadata() + .await + .context("query file length") + .map_err(DownloadError::Other)? + .len(); + + source + .seek(io::SeekFrom::Start(start_inclusive)) + .await + .context("Failed to seek to the range start in a local storage file") + .map_err(DownloadError::Other)?; + + let metadata = self + .read_storage_metadata(&target_path) + .await + .map_err(DownloadError::Other)?; + + let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive); + let source = ReaderStream::new(source); + + let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); + let source = crate::support::DownloadStream::new(cancel_or_timeout, source); + + let etag = mock_etag(&file_metadata); + Ok(Download { + metadata, + last_modified: file_metadata + .modified() + .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?, + etag, + download_stream: Box::pin(source), + }) } async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> { @@ -610,13 +613,22 @@ async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result< Ok(()) } -fn file_exists(file_path: &Utf8Path) -> anyhow::Result { - if file_path.exists() { - ensure!(file_path.is_file(), "file path '{file_path}' is not a file"); - Ok(true) - } else { - Ok(false) - } +async fn file_metadata(file_path: &Utf8Path) -> Result { + tokio::fs::metadata(&file_path).await.map_err(|e| { + if e.kind() == ErrorKind::NotFound { + DownloadError::NotFound + } else { + DownloadError::BadInput(e.into()) + } + }) +} + +// Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we +// read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests +// quickly, with less overhead than using a mock S3 server. +fn mock_etag(meta: &std::fs::Metadata) -> String { + let mtime = meta.modified().expect("Filesystem mtime missing"); + format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()) } #[cfg(test)] diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 438f45fbde..56bc32ebdd 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -35,8 +35,8 @@ use aws_sdk_s3::{ }; use aws_smithy_async::rt::sleep::TokioSleep; -use aws_smithy_types::byte_stream::ByteStream; use aws_smithy_types::{body::SdkBody, DateTime}; +use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError}; use bytes::Bytes; use futures::stream::Stream; use hyper::Body; @@ -287,8 +287,16 @@ impl S3Bucket { let remaining = self.timeout.saturating_sub(started_at.elapsed()); let metadata = object_output.metadata().cloned().map(StorageMetadata); - let etag = object_output.e_tag; - let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok()); + let etag = object_output + .e_tag + .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?; + let last_modified = object_output + .last_modified + .ok_or(DownloadError::Other(anyhow::anyhow!( + "Missing LastModified header" + )))? + .try_into() + .map_err(|e: ConversionError| DownloadError::Other(e.into()))?; let body = object_output.body; let body = ByteStreamAsStream::from(body); diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index d8b9824d99..bc5e40e70f 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -118,7 +118,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: // A little check to ensure that our clock is not too far off from the S3 clock { let dl = retry(|| ctx.client.download(&path2, &cancel)).await?; - let last_modified = dl.last_modified.unwrap(); + let last_modified = dl.last_modified; let half_wt = WAIT_TIME.mul_f32(0.5); let t0_hwt = t0 + half_wt; let t1_hwt = t1 - half_wt;