add encrypt support for s3

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2025-04-15 15:46:41 -04:00
parent 77c9154564
commit 76a044d1fa
8 changed files with 268 additions and 9 deletions

2
Cargo.lock generated
View File

@@ -5510,6 +5510,7 @@ dependencies = [
"azure_identity",
"azure_storage",
"azure_storage_blobs",
"base64 0.13.1",
"bytes",
"camino",
"camino-tempfile",
@@ -5520,6 +5521,7 @@ dependencies = [
"humantime-serde",
"hyper 1.4.1",
"itertools 0.10.5",
"md5",
"metrics",
"once_cell",
"pin-project-lite",

View File

@@ -13,6 +13,7 @@ aws-smithy-async.workspace = true
aws-smithy-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
base64.workspace = true
bytes.workspace = true
camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
@@ -28,6 +29,7 @@ tokio-util = { workspace = true, features = ["compat"] }
toml_edit.workspace = true
tracing.workspace = true
scopeguard.workspace = true
md5.workspace = true
metrics.workspace = true
utils = { path = "../utils", default-features = false }
pin-project-lite.workspace = true

View File

@@ -550,6 +550,30 @@ impl RemoteStorage for AzureBlobStorage {
self.download_for_builder(builder, timeout, cancel).await
}
#[allow(unused_variables)]
async fn download_with_encryption(
&self,
from: &RemotePath,
opts: &DownloadOpts,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
unimplemented!()
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_objects(std::array::from_ref(path), cancel)
.await

View File

@@ -12,12 +12,12 @@
mod azure_blob;
mod config;
mod error;
mod kms;
mod local_fs;
mod metrics;
mod s3_bucket;
mod simulate_failures;
mod support;
mod kms;
use std::collections::HashMap;
use std::fmt::Debug;
@@ -332,6 +332,28 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
/// Same as download, but with SSE-C encryption if the backend supports it.
async fn download_with_encryption(
&self,
from: &RemotePath,
opts: &DownloadOpts,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
/// Same as upload, but with SSE-C encryption if the backend supports it.
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Delete a single path from remote storage.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -616,6 +638,90 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
}
pub async fn download_with_encryption(
&self,
from: &RemotePath,
opts: &DownloadOpts,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => {
s.download_with_encryption(from, opts, encryption_key, cancel)
.await
}
Self::AwsS3(s) => {
s.download_with_encryption(from, opts, encryption_key, cancel)
.await
}
Self::AzureBlob(s) => {
s.download_with_encryption(from, opts, encryption_key, cancel)
.await
}
Self::Unreliable(s) => {
s.download_with_encryption(from, opts, encryption_key, cancel)
.await
}
}
}
pub async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::AwsS3(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::AzureBlob(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::Unreliable(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
}
}
}
impl GenericRemoteStorage {

View File

@@ -560,6 +560,30 @@ impl RemoteStorage for LocalFs {
}
}
#[allow(unused_variables)]
async fn download_with_encryption(
&self,
from: &RemotePath,
opts: &DownloadOpts,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
unimplemented!()
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
async fn delete_objects(
&self,
paths: &[RemotePath],

View File

@@ -66,7 +66,10 @@ struct GetObjectRequest {
key: String,
etag: Option<String>,
range: Option<String>,
/// Base64 encoded SSE-C key for server-side encryption.
sse_c_key: Option<Vec<u8>>,
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
@@ -257,6 +260,13 @@ impl S3Bucket {
builder = builder.if_none_match(etag);
}
if let Some(encryption_key) = request.sse_c_key {
builder = builder.sse_customer_algorithm("AES256");
builder = builder.sse_customer_key(base64::encode(&encryption_key));
builder = builder
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
}
let get_object = builder.send();
let get_object = tokio::select! {
@@ -693,12 +703,13 @@ impl RemoteStorage for S3Bucket {
})
}
async fn upload(
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Put;
@@ -709,7 +720,7 @@ impl RemoteStorage for S3Bucket {
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
let upload = self
let mut upload = self
.client
.put_object()
.bucket(self.bucket_name.clone())
@@ -717,8 +728,17 @@ impl RemoteStorage for S3Bucket {
.set_metadata(metadata.map(|m| m.0))
.set_storage_class(self.upload_storage_class.clone())
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream)
.send();
.body(bytes_stream);
if let Some(encryption_key) = encryption_key {
upload = upload.sse_customer_algorithm("AES256");
let base64_key = base64::encode(&encryption_key);
upload = upload.sse_customer_key(&base64_key);
upload = upload
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
}
let upload = upload.send();
let upload = tokio::time::timeout(self.timeout, upload);
@@ -742,6 +762,18 @@ impl RemoteStorage for S3Bucket {
}
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel)
.await
}
async fn copy(
&self,
from: &RemotePath,
@@ -787,10 +819,11 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
async fn download(
async fn download_with_encryption(
&self,
from: &RemotePath,
opts: &DownloadOpts,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// if prefix is not none then download file `prefix/from`
@@ -801,12 +834,23 @@ impl RemoteStorage for S3Bucket {
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: opts.byte_range_header(),
sse_c_key: encryption_key.map(|k| k.to_vec()),
},
cancel,
)
.await
}
async fn download(
&self,
from: &RemotePath,
opts: &DownloadOpts,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
self.download_with_encryption(from, opts, None, cancel)
.await
}
async fn delete_objects(
&self,
paths: &[RemotePath],

View File

@@ -178,6 +178,30 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.download(from, opts, cancel).await
}
#[allow(unused_variables)]
async fn download_with_encryption(
&self,
from: &RemotePath,
opts: &DownloadOpts,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
unimplemented!()
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_inner(path, true, cancel).await
}

View File

@@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
let timeout = std::time::Duration::from_secs(5);
@@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
{
let stream = ctx
@@ -555,6 +555,7 @@ async fn upload_large_enough_file(
client: &GenericRemoteStorage,
path: &RemotePath,
cancel: &CancellationToken,
encryption_key: Option<&[u8]>,
) -> usize {
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
let body = bytes::Bytes::from(vec![0u8; 1024]);
@@ -565,9 +566,41 @@ async fn upload_large_enough_file(
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
client
.upload(contents, len, path, None, cancel)
.upload_with_encryption(contents, len, path, None, encryption_key, cancel)
.await
.expect("upload succeeds");
len
}
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn encryption_works(ctx: &mut MaybeEnabledStorage) {
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
return;
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
.unwrap();
let key = rand::random::<[u8; 32]>();
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await;
{
let download = ctx
.client
.download_with_encryption(&path, &DownloadOpts::default(), Some(&key), &cancel)
.await
.expect("should succeed");
let vec = download_to_vec(download).await.expect("should succeed");
assert_eq!(vec.len(), file_len);
}
let cancel = CancellationToken::new();
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
}