From 76a044d1fad81f52d6013786facb20bb90c2beca Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 15 Apr 2025 15:46:41 -0400 Subject: [PATCH] add encrypt support for s3 Signed-off-by: Alex Chi Z --- Cargo.lock | 2 + libs/remote_storage/Cargo.toml | 2 + libs/remote_storage/src/azure_blob.rs | 24 +++++ libs/remote_storage/src/lib.rs | 108 ++++++++++++++++++- libs/remote_storage/src/local_fs.rs | 24 +++++ libs/remote_storage/src/s3_bucket.rs | 54 +++++++++- libs/remote_storage/src/simulate_failures.rs | 24 +++++ libs/remote_storage/tests/test_real_s3.rs | 39 ++++++- 8 files changed, 268 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d2cdcea27..b5d173d6d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5510,6 +5510,7 @@ dependencies = [ "azure_identity", "azure_storage", "azure_storage_blobs", + "base64 0.13.1", "bytes", "camino", "camino-tempfile", @@ -5520,6 +5521,7 @@ dependencies = [ "humantime-serde", "hyper 1.4.1", "itertools 0.10.5", + "md5", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index ad0f12074f..b4150cbf87 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -13,6 +13,7 @@ aws-smithy-async.workspace = true aws-smithy-types.workspace = true aws-config.workspace = true aws-sdk-s3.workspace = true +base64.workspace = true bytes.workspace = true camino = { workspace = true, features = ["serde1"] } humantime-serde.workspace = true @@ -28,6 +29,7 @@ tokio-util = { workspace = true, features = ["compat"] } toml_edit.workspace = true tracing.workspace = true scopeguard.workspace = true +md5.workspace = true metrics.workspace = true utils = { path = "../utils", default-features = false } pin-project-lite.workspace = true diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 18146c5464..fbb4e1d52b 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -550,6 +550,30 @@ impl RemoteStorage for AzureBlobStorage { self.download_for_builder(builder, timeout, cancel).await } + #[allow(unused_variables)] + async fn download_with_encryption( + &self, + from: &RemotePath, + opts: &DownloadOpts, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> Result { + unimplemented!() + } + + #[allow(unused_variables)] + async fn upload_with_encryption( + &self, + from: impl Stream> + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + unimplemented!() + } + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { self.delete_objects(std::array::from_ref(path), cancel) .await diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 27049de786..d2d27630b6 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -12,12 +12,12 @@ mod azure_blob; mod config; mod error; +mod kms; mod local_fs; mod metrics; mod s3_bucket; mod simulate_failures; mod support; -mod kms; use std::collections::HashMap; use std::fmt::Debug; @@ -332,6 +332,28 @@ pub trait RemoteStorage: Send + Sync + 'static { cancel: &CancellationToken, ) -> Result; + /// Same as download, but with SSE-C encryption if the backend supports it. + async fn download_with_encryption( + &self, + from: &RemotePath, + opts: &DownloadOpts, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> Result; + + /// Same as upload, but with SSE-C encryption if the backend supports it. + async fn upload_with_encryption( + &self, + from: impl Stream> + Send + Sync + 'static, + // S3 PUT request requires the content length to be specified, + // otherwise it starts to fail with the concurrent connection count increasing. + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> anyhow::Result<()>; + /// Delete a single path from remote storage. /// /// If the operation fails because of timeout or cancellation, the root cause of the error will be @@ -616,6 +638,90 @@ impl GenericRemoteStorage> { } } } + + pub async fn download_with_encryption( + &self, + from: &RemotePath, + opts: &DownloadOpts, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> Result { + match self { + Self::LocalFs(s) => { + s.download_with_encryption(from, opts, encryption_key, cancel) + .await + } + Self::AwsS3(s) => { + s.download_with_encryption(from, opts, encryption_key, cancel) + .await + } + Self::AzureBlob(s) => { + s.download_with_encryption(from, opts, encryption_key, cancel) + .await + } + Self::Unreliable(s) => { + s.download_with_encryption(from, opts, encryption_key, cancel) + .await + } + } + } + + pub async fn upload_with_encryption( + &self, + from: impl Stream> + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => { + s.upload_with_encryption( + from, + data_size_bytes, + to, + metadata, + encryption_key, + cancel, + ) + .await + } + Self::AwsS3(s) => { + s.upload_with_encryption( + from, + data_size_bytes, + to, + metadata, + encryption_key, + cancel, + ) + .await + } + Self::AzureBlob(s) => { + s.upload_with_encryption( + from, + data_size_bytes, + to, + metadata, + encryption_key, + cancel, + ) + .await + } + Self::Unreliable(s) => { + s.upload_with_encryption( + from, + data_size_bytes, + to, + metadata, + encryption_key, + cancel, + ) + .await + } + } + } } impl GenericRemoteStorage { diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index f03d6ac8ee..97a1834d8b 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -560,6 +560,30 @@ impl RemoteStorage for LocalFs { } } + #[allow(unused_variables)] + async fn download_with_encryption( + &self, + from: &RemotePath, + opts: &DownloadOpts, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> Result { + unimplemented!() + } + + #[allow(unused_variables)] + async fn upload_with_encryption( + &self, + from: impl Stream> + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + unimplemented!() + } + async fn delete_objects( &self, paths: &[RemotePath], diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index ba7ce9e1e7..34f31bcf8e 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -66,7 +66,10 @@ struct GetObjectRequest { key: String, etag: Option, range: Option, + /// Base64 encoded SSE-C key for server-side encryption. + sse_c_key: Option>, } + impl S3Bucket { /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided. pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result { @@ -257,6 +260,13 @@ impl S3Bucket { builder = builder.if_none_match(etag); } + if let Some(encryption_key) = request.sse_c_key { + builder = builder.sse_customer_algorithm("AES256"); + builder = builder.sse_customer_key(base64::encode(&encryption_key)); + builder = builder + .sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice())); + } + let get_object = builder.send(); let get_object = tokio::select! { @@ -693,12 +703,13 @@ impl RemoteStorage for S3Bucket { }) } - async fn upload( + async fn upload_with_encryption( &self, from: impl Stream> + Send + Sync + 'static, from_size_bytes: usize, to: &RemotePath, metadata: Option, + encryption_key: Option<&[u8]>, cancel: &CancellationToken, ) -> anyhow::Result<()> { let kind = RequestKind::Put; @@ -709,7 +720,7 @@ impl RemoteStorage for S3Bucket { let body = StreamBody::new(from.map(|x| x.map(Frame::data))); let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body)); - let upload = self + let mut upload = self .client .put_object() .bucket(self.bucket_name.clone()) @@ -717,8 +728,17 @@ impl RemoteStorage for S3Bucket { .set_metadata(metadata.map(|m| m.0)) .set_storage_class(self.upload_storage_class.clone()) .content_length(from_size_bytes.try_into()?) - .body(bytes_stream) - .send(); + .body(bytes_stream); + + if let Some(encryption_key) = encryption_key { + upload = upload.sse_customer_algorithm("AES256"); + let base64_key = base64::encode(&encryption_key); + upload = upload.sse_customer_key(&base64_key); + upload = upload + .sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice())); + } + + let upload = upload.send(); let upload = tokio::time::timeout(self.timeout, upload); @@ -742,6 +762,18 @@ impl RemoteStorage for S3Bucket { } } + async fn upload( + &self, + from: impl Stream> + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel) + .await + } + async fn copy( &self, from: &RemotePath, @@ -787,10 +819,11 @@ impl RemoteStorage for S3Bucket { Ok(()) } - async fn download( + async fn download_with_encryption( &self, from: &RemotePath, opts: &DownloadOpts, + encryption_key: Option<&[u8]>, cancel: &CancellationToken, ) -> Result { // if prefix is not none then download file `prefix/from` @@ -801,12 +834,23 @@ impl RemoteStorage for S3Bucket { key: self.relative_path_to_s3_object(from), etag: opts.etag.as_ref().map(|e| e.to_string()), range: opts.byte_range_header(), + sse_c_key: encryption_key.map(|k| k.to_vec()), }, cancel, ) .await } + async fn download( + &self, + from: &RemotePath, + opts: &DownloadOpts, + cancel: &CancellationToken, + ) -> Result { + self.download_with_encryption(from, opts, None, cancel) + .await + } + async fn delete_objects( &self, paths: &[RemotePath], diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index f56be873c4..2ac394201c 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -178,6 +178,30 @@ impl RemoteStorage for UnreliableWrapper { self.inner.download(from, opts, cancel).await } + #[allow(unused_variables)] + async fn download_with_encryption( + &self, + from: &RemotePath, + opts: &DownloadOpts, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> Result { + unimplemented!() + } + + #[allow(unused_variables)] + async fn upload_with_encryption( + &self, + from: impl Stream> + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + encryption_key: Option<&[u8]>, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + unimplemented!() + } + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { self.delete_inner(path, true, cancel).await } diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index d38e13fd05..7e1c67f1bf 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) { )) .unwrap(); - let len = upload_large_enough_file(&ctx.client, &path, &cancel).await; + let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await; let timeout = std::time::Duration::from_secs(5); @@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) { )) .unwrap(); - let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await; + let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await; { let stream = ctx @@ -555,6 +555,7 @@ async fn upload_large_enough_file( client: &GenericRemoteStorage, path: &RemotePath, cancel: &CancellationToken, + encryption_key: Option<&[u8]>, ) -> usize { let header = bytes::Bytes::from_static("remote blob data content".as_bytes()); let body = bytes::Bytes::from(vec![0u8; 1024]); @@ -565,9 +566,41 @@ async fn upload_large_enough_file( let contents = futures::stream::iter(contents.map(std::io::Result::Ok)); client - .upload(contents, len, path, None, cancel) + .upload_with_encryption(contents, len, path, None, encryption_key, cancel) .await .expect("upload succeeds"); len } + +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn encryption_works(ctx: &mut MaybeEnabledStorage) { + let MaybeEnabledStorage::Enabled(ctx) = ctx else { + return; + }; + + let cancel = CancellationToken::new(); + + let path = RemotePath::new(Utf8Path::new( + format!("{}/file_to_copy", ctx.base_prefix).as_str(), + )) + .unwrap(); + + let key = rand::random::<[u8; 32]>(); + let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await; + + { + let download = ctx + .client + .download_with_encryption(&path, &DownloadOpts::default(), Some(&key), &cancel) + .await + .expect("should succeed"); + let vec = download_to_vec(download).await.expect("should succeed"); + assert_eq!(vec.len(), file_len); + } + + let cancel = CancellationToken::new(); + + ctx.client.delete_objects(&[path], &cancel).await.unwrap(); +}