diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 152929ecd3..119d4c9e55 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -353,4 +353,8 @@ impl RemoteStorage for AzureBlobStorage { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 435364d83a..bd8ead4566 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -112,7 +112,7 @@ impl RemotePath { self.0.file_name() } - pub fn join(&self, segment: &Utf8Path) -> Self { + pub fn join>(&self, segment: P) -> Self { Self(self.0.join(segment)) } @@ -183,6 +183,8 @@ pub trait RemoteStorage: Send + Sync + 'static { async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()>; } pub struct Download { @@ -328,6 +330,15 @@ impl GenericRemoteStorage { Self::Unreliable(s) => s.delete_objects(paths).await, } } + + pub async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.copy_object(src, dst).await, + Self::AwsS3(s) => s.copy_object(src, dst).await, + Self::AzureBlob(s) => s.copy_object(src, dst).await, + Self::Unreliable(s) => s.copy_object(src, dst).await, + } + } } impl GenericRemoteStorage { diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 3d32b6b631..f224b78fcd 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -371,6 +371,27 @@ impl RemoteStorage for LocalFs { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + let src_path = src.with_base(&self.storage_root); + let dst_path = dst.with_base(&self.storage_root); + + // If the destination file already exists, we need to delete it first. + if dst_path.exists() { + fs::remove_file(&dst_path).await?; + } + + // Copy the file. + fs::copy(&src_path, &dst_path).await?; + + // Copy the metadata. + let metadata_path = storage_metadata_path(&src_path); + if metadata_path.exists() { + fs::copy(&metadata_path, storage_metadata_path(&dst_path)).await?; + } + + Ok(()) + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index fc94281666..d78b6f46cf 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -221,6 +221,8 @@ impl S3Bucket { )), } } + + } pin_project_lite::pin_project! { @@ -546,6 +548,11 @@ impl RemoteStorage for S3Bucket { let paths = std::array::from_ref(path); self.delete_objects(paths).await } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } + } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. @@ -628,4 +635,6 @@ mod tests { } } } + + } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 6d6a5c1d24..7e60aa5f34 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -149,4 +149,8 @@ impl RemoteStorage for UnreliableWrapper { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } }