diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index ae08e9b171..608b3e8609 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -311,4 +311,8 @@ impl RemoteStorage for AzureBlobStorage { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 7054610d9e..764412e8d6 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -114,7 +114,7 @@ impl RemotePath { self.0.file_name() } - pub fn join(&self, segment: &Utf8Path) -> Self { + pub fn join>(&self, segment: P) -> Self { Self(self.0.join(segment)) } @@ -215,6 +215,8 @@ pub trait RemoteStorage: Send + Sync + 'static { async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()>; } pub struct Download { @@ -377,6 +379,15 @@ impl GenericRemoteStorage { Self::Unreliable(s) => s.delete_objects(paths).await, } } + + pub async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.copy_object(src, dst).await, + Self::AwsS3(s) => s.copy_object(src, dst).await, + Self::AzureBlob(s) => s.copy_object(src, dst).await, + Self::Unreliable(s) => s.copy_object(src, dst).await, + } + } } impl GenericRemoteStorage { diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 1be50ce565..ae397bf4bb 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -393,6 +393,27 @@ impl RemoteStorage for LocalFs { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + let src_path = src.with_base(&self.storage_root); + let dst_path = dst.with_base(&self.storage_root); + + // If the destination file already exists, we need to delete it first. + if dst_path.exists() { + fs::remove_file(&dst_path).await?; + } + + // Copy the file. + fs::copy(&src_path, &dst_path).await?; + + // Copy the metadata. + let metadata_path = storage_metadata_path(&src_path); + if metadata_path.exists() { + fs::copy(&metadata_path, storage_metadata_path(&dst_path)).await?; + } + + Ok(()) + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 560a2c14e9..eb136e58e3 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -221,6 +221,8 @@ impl S3Bucket { )), } } + + } pin_project_lite::pin_project! { @@ -515,6 +517,11 @@ impl RemoteStorage for S3Bucket { let paths = std::array::from_ref(path); self.delete_objects(paths).await } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } + } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. @@ -597,4 +604,6 @@ mod tests { } } } + + } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index cd13db1923..8e10fcb098 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -160,4 +160,8 @@ impl RemoteStorage for UnreliableWrapper { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } }