From 6da5c189b50d20429e43ae7d1288c6f257aa7a87 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 18 Apr 2025 15:02:11 -0400 Subject: [PATCH] full impl of local fs encryption Signed-off-by: Alex Chi Z --- libs/remote_storage/src/local_fs.rs | 101 +++++++++++++++++++--------- 1 file changed, 71 insertions(+), 30 deletions(-) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 6468bcbf73..03fa1cb36e 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -218,6 +218,7 @@ impl LocalFs { data_size_bytes: usize, to: &RemotePath, metadata: Option, + enctyption_key: Option<&[u8]>, cancel: &CancellationToken, ) -> anyhow::Result<()> { let target_file_path = to.with_base(&self.storage_root); @@ -306,6 +307,8 @@ impl LocalFs { ) })?; + // TODO: we might need to make the following writes atomic with the file write operation above + if let Some(storage_metadata) = metadata { // FIXME: we must not be using metadata much, since this would forget the old metadata // for new writes? or perhaps metadata is sticky; could consider removing if it's never @@ -324,6 +327,15 @@ impl LocalFs { })?; } + if let Some(encryption_key) = enctyption_key { + let encryption_key_path = storage_encryption_key_path(&target_file_path); + fs::write(&encryption_key_path, encryption_key).await.with_context(|| { + format!( + "Failed to write encryption key to the local storage at '{encryption_key_path}'", + ) + })?; + } + Ok(()) } } @@ -450,6 +462,7 @@ impl RemoteStorage for LocalFs { key: &RemotePath, _cancel: &CancellationToken, ) -> Result { + // TODO: check encryption key let target_file_path = key.with_base(&self.storage_root); let metadata = file_metadata(&target_file_path).await?; Ok(ListingObject { @@ -461,34 +474,14 @@ impl RemoteStorage for LocalFs { async fn upload( &self, - data: impl Stream> + Send + Sync, + data: impl Stream> + Send + Sync + 'static, data_size_bytes: usize, to: &RemotePath, metadata: Option, cancel: &CancellationToken, ) -> anyhow::Result<()> { - let cancel = cancel.child_token(); - - let op = self.upload0(data, data_size_bytes, to, metadata, &cancel); - let mut op = std::pin::pin!(op); - - // race the upload0 to the timeout; if it goes over, do a graceful shutdown - let (res, timeout) = tokio::select! { - res = &mut op => (res, false), - _ = tokio::time::sleep(self.timeout) => { - cancel.cancel(); - (op.await, true) - } - }; - - match res { - Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => { - // we caused this cancel (or they happened simultaneously) -- swap it out to - // Timeout - Err(TimeoutOrCancel::Timeout.into()) - } - res => res, - } + self.upload_with_encryption(data, data_size_bytes, to, metadata, None, cancel) + .await } async fn download( @@ -506,6 +499,22 @@ impl RemoteStorage for LocalFs { return Err(DownloadError::Unmodified); } + let key = match fs::read(storage_encryption_key_path(&target_path)).await { + Ok(key) => Some(key), + Err(e) if e.kind() == ErrorKind::NotFound => None, + Err(e) => { + return Err(DownloadError::Other( + anyhow::anyhow!(e).context("cannot read encryption key"), + )); + } + }; + + if key != opts.encryption_key { + return Err(DownloadError::Other(anyhow::anyhow!( + "encryption key mismatch" + ))); + } + let mut file = fs::OpenOptions::new() .read(true) .open(&target_path) @@ -551,27 +560,54 @@ impl RemoteStorage for LocalFs { async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> { let file_path = path.with_base(&self.storage_root); match fs::remove_file(&file_path).await { - Ok(()) => Ok(()), + Ok(()) => {} // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour. // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful. - Err(e) if e.kind() == ErrorKind::NotFound => Ok(()), - Err(e) => Err(anyhow::anyhow!(e)), - } + Err(e) if e.kind() == ErrorKind::NotFound => {} + Err(e) => return Err(anyhow::anyhow!(e)), + }; + fs::remove_file(&storage_metadata_path(&file_path)) + .await + .ok(); + fs::remove_file(&storage_encryption_key_path(&file_path)) + .await + .ok(); + Ok(()) } #[allow(unused_variables)] async fn upload_with_encryption( &self, - from: impl Stream> + Send + Sync + 'static, + data: impl Stream> + Send + Sync + 'static, data_size_bytes: usize, to: &RemotePath, metadata: Option, encryption_key: Option<&[u8]>, cancel: &CancellationToken, ) -> anyhow::Result<()> { - self.upload(from, data_size_bytes, to, metadata, cancel) - .await + let cancel = cancel.child_token(); + + let op = self.upload0(data, data_size_bytes, to, metadata, encryption_key, &cancel); + let mut op = std::pin::pin!(op); + + // race the upload0 to the timeout; if it goes over, do a graceful shutdown + let (res, timeout) = tokio::select! { + res = &mut op => (res, false), + _ = tokio::time::sleep(self.timeout) => { + cancel.cancel(); + (op.await, true) + } + }; + + match res { + Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => { + // we caused this cancel (or they happened simultaneously) -- swap it out to + // Timeout + Err(TimeoutOrCancel::Timeout.into()) + } + res => res, + } } async fn delete_objects( @@ -605,6 +641,7 @@ impl RemoteStorage for LocalFs { to_path = to_path ) })?; + // TODO: copy metadata and encryption key Ok(()) } @@ -623,6 +660,10 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { path_with_suffix_extension(original_path, "metadata") } +fn storage_encryption_key_path(original_path: &Utf8Path) -> Utf8PathBuf { + path_with_suffix_extension(original_path, "enc") +} + async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> { let target_dir = match target_file_path.parent() { Some(parent_dir) => parent_dir,