add key repo

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2025-04-21 18:10:18 -04:00
parent 6da5c189b5
commit c32c26d741
5 changed files with 31 additions and 23 deletions

View File

@@ -198,6 +198,10 @@ impl LocalFs {
let mut entries = cur_folder.read_dir_utf8()?;
while let Some(Ok(entry)) = entries.next() {
let file_name = entry.file_name();
if file_name.ends_with(".metadata") || file_name.ends_with(".enc") {
// ignore metadata and encryption key files
continue;
}
let full_file_name = cur_folder.join(file_name);
if full_file_name.as_str().starts_with(prefix) {
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());

View File

@@ -370,6 +370,8 @@ pub(crate) struct RemoteTimelineClient {
cancel: CancellationToken,
kms_impl: Option<NaiveKms>,
key_repo: std::sync::Mutex<HashMap<EncryptionKeyId, EncryptionKeyPair>>,
}
impl Drop for RemoteTimelineClient {
@@ -416,6 +418,7 @@ impl RemoteTimelineClient {
cancel: CancellationToken::new(),
// TODO: make this configurable
kms_impl: Some(NaiveKms::new(tenant_shard_id.tenant_id.to_string())),
key_repo: std::sync::Mutex::new(HashMap::new()),
}
}
@@ -1276,12 +1279,10 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
layer: ResidentLayer,
) -> Result<(), NotInitialized> {
let key_pair = self.schedule_generate_encryption_key()?;
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_layer_file_upload0(upload_queue, layer, key_pair);
self.schedule_layer_file_upload0(upload_queue, layer);
self.launch_queued_tasks(upload_queue);
Ok(())
}
@@ -1290,16 +1291,16 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
layer: ResidentLayer,
key_pair: Option<EncryptionKeyPair>,
) {
let mut metadata = layer.metadata();
assert!(
metadata.encryption_key.is_none(),
"layer key is set automatically in schedule_layer_file_upload, should not be set manually"
);
if let Some(ref key_pair) = key_pair {
metadata.encryption_key = Some(key_pair.id.clone());
}
let key_pair = {
if let Some(key_id) = layer.metadata().encryption_key {
let guard = self.key_repo.lock().unwrap();
Some(guard.get(&key_id).cloned().unwrap())
} else {
None
}
};
let metadata = layer.metadata();
upload_queue
.dirty
@@ -1540,6 +1541,8 @@ impl RemoteTimelineClient {
created_at: Utc::now().naive_utc(),
});
self.key_repo.lock().unwrap().insert(this_key_version, key_pair);
self.schedule_index_upload(upload_queue);
Ok(Some(key_pair))
@@ -1554,13 +1557,11 @@ impl RemoteTimelineClient {
compacted_to: &[ResidentLayer],
) -> Result<(), NotInitialized> {
// Use the same key for all layers in a single compaction job
let key_pair = self.schedule_generate_encryption_key()?;
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
for layer in compacted_to {
self.schedule_layer_file_upload0(upload_queue, layer.clone(), key_pair.clone());
self.schedule_layer_file_upload0(upload_queue, layer.clone());
}
let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
@@ -2893,6 +2894,10 @@ mod tests {
for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
if fname.ends_with(".metadata") || fname.ends_with(".enc") {
// ignore metadata and encryption key files; should use local_fs APIs instead in the future
continue;
}
found.push(String::from(fname));
}
found.sort();
@@ -2947,6 +2952,7 @@ mod tests {
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
cancel: CancellationToken::new(),
kms_impl: None,
key_repo: std::sync::Mutex::new(HashMap::new()),
})
}

View File

@@ -122,7 +122,7 @@ pub struct IndexPart {
pub(crate) keys: Vec<EncryptionKey>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
pub struct KeyVersion(pub u32);
impl KeyVersion {
@@ -132,7 +132,7 @@ impl KeyVersion {
}
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
pub struct EncryptionKeyId {
pub version: KeyVersion,
pub generation: Generation,

View File

@@ -4864,6 +4864,7 @@ impl Timeline {
else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
// FIXME: even though we have a single image and single delta layer assumption
// we push them to vec
@@ -6932,9 +6933,7 @@ impl Timeline {
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client
.schedule_layer_file_upload(image_layer)
.unwrap();
self.remote_client.schedule_layer_file_upload(image_layer)?;
Ok(())
}
@@ -6995,9 +6994,7 @@ impl Timeline {
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client
.schedule_layer_file_upload(delta_layer)
.unwrap();
self.remote_client.schedule_layer_file_upload(delta_layer)?;
Ok(())
}

View File

@@ -1291,6 +1291,7 @@ impl Timeline {
.parts
.extend(sparse_partitioning.into_dense().parts);
// 3. Create new image layers for partitions that have been modified "enough".
let (image_layers, outcome) = self
.create_image_layers(