full impl of local fs encryption

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2025-04-18 15:02:11 -04:00
parent 66c01e46ca
commit 6da5c189b5

View File

@@ -218,6 +218,7 @@ impl LocalFs {
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
enctyption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let target_file_path = to.with_base(&self.storage_root);
@@ -306,6 +307,8 @@ impl LocalFs {
)
})?;
// TODO: we might need to make the following writes atomic with the file write operation above
if let Some(storage_metadata) = metadata {
// FIXME: we must not be using metadata much, since this would forget the old metadata
// for new writes? or perhaps metadata is sticky; could consider removing if it's never
@@ -324,6 +327,15 @@ impl LocalFs {
})?;
}
if let Some(encryption_key) = enctyption_key {
let encryption_key_path = storage_encryption_key_path(&target_file_path);
fs::write(&encryption_key_path, encryption_key).await.with_context(|| {
format!(
"Failed to write encryption key to the local storage at '{encryption_key_path}'",
)
})?;
}
Ok(())
}
}
@@ -450,6 +462,7 @@ impl RemoteStorage for LocalFs {
key: &RemotePath,
_cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
// TODO: check encryption key
let target_file_path = key.with_base(&self.storage_root);
let metadata = file_metadata(&target_file_path).await?;
Ok(ListingObject {
@@ -461,34 +474,14 @@ impl RemoteStorage for LocalFs {
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let cancel = cancel.child_token();
let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
let mut op = std::pin::pin!(op);
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
let (res, timeout) = tokio::select! {
res = &mut op => (res, false),
_ = tokio::time::sleep(self.timeout) => {
cancel.cancel();
(op.await, true)
}
};
match res {
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
// we caused this cancel (or they happened simultaneously) -- swap it out to
// Timeout
Err(TimeoutOrCancel::Timeout.into())
}
res => res,
}
self.upload_with_encryption(data, data_size_bytes, to, metadata, None, cancel)
.await
}
async fn download(
@@ -506,6 +499,22 @@ impl RemoteStorage for LocalFs {
return Err(DownloadError::Unmodified);
}
let key = match fs::read(storage_encryption_key_path(&target_path)).await {
Ok(key) => Some(key),
Err(e) if e.kind() == ErrorKind::NotFound => None,
Err(e) => {
return Err(DownloadError::Other(
anyhow::anyhow!(e).context("cannot read encryption key"),
));
}
};
if key != opts.encryption_key {
return Err(DownloadError::Other(anyhow::anyhow!(
"encryption key mismatch"
)));
}
let mut file = fs::OpenOptions::new()
.read(true)
.open(&target_path)
@@ -551,27 +560,54 @@ impl RemoteStorage for LocalFs {
async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
let file_path = path.with_base(&self.storage_root);
match fs::remove_file(&file_path).await {
Ok(()) => Ok(()),
Ok(()) => {}
// The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
// > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) => Err(anyhow::anyhow!(e)),
}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(e) => return Err(anyhow::anyhow!(e)),
};
fs::remove_file(&storage_metadata_path(&file_path))
.await
.ok();
fs::remove_file(&storage_encryption_key_path(&file_path))
.await
.ok();
Ok(())
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.upload(from, data_size_bytes, to, metadata, cancel)
.await
let cancel = cancel.child_token();
let op = self.upload0(data, data_size_bytes, to, metadata, encryption_key, &cancel);
let mut op = std::pin::pin!(op);
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
let (res, timeout) = tokio::select! {
res = &mut op => (res, false),
_ = tokio::time::sleep(self.timeout) => {
cancel.cancel();
(op.await, true)
}
};
match res {
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
// we caused this cancel (or they happened simultaneously) -- swap it out to
// Timeout
Err(TimeoutOrCancel::Timeout.into())
}
res => res,
}
}
async fn delete_objects(
@@ -605,6 +641,7 @@ impl RemoteStorage for LocalFs {
to_path = to_path
)
})?;
// TODO: copy metadata and encryption key
Ok(())
}
@@ -623,6 +660,10 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
path_with_suffix_extension(original_path, "metadata")
}
fn storage_encryption_key_path(original_path: &Utf8Path) -> Utf8PathBuf {
path_with_suffix_extension(original_path, "enc")
}
async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
let target_dir = match target_file_path.parent() {
Some(parent_dir) => parent_dir,