Compare commits

...

7 Commits

Author SHA1 Message Date
Alex Chi Z
c32c26d741 add key repo
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-21 18:10:18 -04:00
Alex Chi Z
6da5c189b5 full impl of local fs encryption
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-18 15:02:11 -04:00
Alex Chi Z
66c01e46ca disable index_part upload check
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-18 14:51:00 -04:00
Alex Chi Z
94fe9d00f3 read path
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-17 16:29:36 -04:00
Alex Chi Z
dc19960cbf feat(pageserver): upload encrypted layer files
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-17 16:08:18 -04:00
Alex Chi Z
d3b654f6db feat(pageserver): support SSE-C for s3
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-17 15:24:10 -04:00
Alex Chi Z
4a898cfa2c feat(pageserver): add initial encrypt key fields
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-17 15:23:55 -04:00
22 changed files with 836 additions and 124 deletions

14
Cargo.lock generated
View File

@@ -4251,6 +4251,7 @@ dependencies = [
"arc-swap",
"async-compression",
"async-stream",
"base64 0.13.1",
"bincode",
"bit_field",
"byteorder",
@@ -4298,6 +4299,7 @@ dependencies = [
"rand 0.8.5",
"range-set-blaze",
"regex",
"remote_keys",
"remote_storage",
"reqwest",
"rpds",
@@ -5503,6 +5505,16 @@ version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
[[package]]
name = "remote_keys"
version = "0.1.0"
dependencies = [
"anyhow",
"rand 0.8.5",
"utils",
"workspace_hack",
]
[[package]]
name = "remote_storage"
version = "0.1.0"
@@ -5518,6 +5530,7 @@ dependencies = [
"azure_identity",
"azure_storage",
"azure_storage_blobs",
"base64 0.13.1",
"bytes",
"camino",
"camino-tempfile",
@@ -5528,6 +5541,7 @@ dependencies = [
"humantime-serde",
"hyper 1.4.1",
"itertools 0.10.5",
"md5",
"metrics",
"once_cell",
"pin-project-lite",

View File

@@ -30,6 +30,7 @@ members = [
"libs/tenant_size_model",
"libs/metrics",
"libs/postgres_connection",
"libs/remote_keys",
"libs/remote_storage",
"libs/tracing-utils",
"libs/postgres_ffi/wal_craft",
@@ -255,6 +256,7 @@ postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
remote_keys = { version = "0.1", path = "./libs/remote_keys/" }
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
safekeeper_client = { path = "./safekeeper/client" }
desim = { version = "0.1", path = "./libs/desim" }

View File

@@ -0,0 +1,13 @@
[package]
name = "remote_keys"
version = "0.1.0"
edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
utils.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
rand.workspace = true

View File

@@ -0,0 +1,42 @@
//! A module that provides a KMS implementation that generates and unwraps the keys.
//!
/// A KMS implementation that does static wrapping and unwrapping of the keys.
pub struct NaiveKms {
account_id: String,
}
impl NaiveKms {
pub fn new(account_id: String) -> Self {
Self { account_id }
}
pub fn encrypt(&self, plain: &[u8]) -> anyhow::Result<Vec<u8>> {
let wrapped = [self.account_id.as_bytes(), "-wrapped-".as_bytes(), plain].concat();
Ok(wrapped)
}
pub fn decrypt(&self, wrapped: &[u8]) -> anyhow::Result<Vec<u8>> {
let Some(wrapped) = wrapped.strip_prefix(self.account_id.as_bytes()) else {
return Err(anyhow::anyhow!("invalid key"));
};
let Some(plain) = wrapped.strip_prefix(b"-wrapped-") else {
return Err(anyhow::anyhow!("invalid key"));
};
Ok(plain.to_vec())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_key() {
let kms = NaiveKms::new("test-tenant".to_string());
let data = rand::random::<[u8; 32]>().to_vec();
let encrypted = kms.encrypt(&data).unwrap();
let decrypted = kms.decrypt(&encrypted).unwrap();
assert_eq!(data, decrypted);
}
}

View File

@@ -13,6 +13,7 @@ aws-smithy-async.workspace = true
aws-smithy-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
base64.workspace = true
bytes.workspace = true
camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
@@ -27,6 +28,7 @@ tokio-util = { workspace = true, features = ["compat"] }
toml_edit.workspace = true
tracing.workspace = true
scopeguard.workspace = true
md5.workspace = true
metrics.workspace = true
utils = { path = "../utils", default-features = false }
pin-project-lite.workspace = true

View File

@@ -550,6 +550,19 @@ impl RemoteStorage for AzureBlobStorage {
self.download_for_builder(builder, timeout, cancel).await
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_objects(std::array::from_ref(path), cancel)
.await

View File

@@ -190,6 +190,8 @@ pub struct DownloadOpts {
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
/// for layer files
pub kind: DownloadKind,
/// The encryption key to use for the download.
pub encryption_key: Option<Vec<u8>>,
}
pub enum DownloadKind {
@@ -204,6 +206,7 @@ impl Default for DownloadOpts {
byte_start: Bound::Unbounded,
byte_end: Bound::Unbounded,
kind: DownloadKind::Large,
encryption_key: None,
}
}
}
@@ -241,6 +244,15 @@ impl DownloadOpts {
None => format!("bytes={start}-"),
})
}
pub fn with_encryption_key(mut self, encryption_key: Option<impl AsRef<[u8]>>) -> Self {
self.encryption_key = encryption_key.map(|k| k.as_ref().to_vec());
self
}
pub fn encryption_key(&self) -> Option<&[u8]> {
self.encryption_key.as_deref()
}
}
/// Storage (potentially remote) API to manage its state.
@@ -331,6 +343,19 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
/// Same as upload, but with remote encryption if the backend supports it (e.g. SSE-C on AWS).
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Delete a single path from remote storage.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -615,6 +640,63 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
}
pub async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::AwsS3(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::AzureBlob(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::Unreliable(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
}
}
}
impl GenericRemoteStorage {

View File

@@ -198,6 +198,10 @@ impl LocalFs {
let mut entries = cur_folder.read_dir_utf8()?;
while let Some(Ok(entry)) = entries.next() {
let file_name = entry.file_name();
if file_name.ends_with(".metadata") || file_name.ends_with(".enc") {
// ignore metadata and encryption key files
continue;
}
let full_file_name = cur_folder.join(file_name);
if full_file_name.as_str().starts_with(prefix) {
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
@@ -218,6 +222,7 @@ impl LocalFs {
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
enctyption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let target_file_path = to.with_base(&self.storage_root);
@@ -306,6 +311,8 @@ impl LocalFs {
)
})?;
// TODO: we might need to make the following writes atomic with the file write operation above
if let Some(storage_metadata) = metadata {
// FIXME: we must not be using metadata much, since this would forget the old metadata
// for new writes? or perhaps metadata is sticky; could consider removing if it's never
@@ -324,6 +331,15 @@ impl LocalFs {
})?;
}
if let Some(encryption_key) = enctyption_key {
let encryption_key_path = storage_encryption_key_path(&target_file_path);
fs::write(&encryption_key_path, encryption_key).await.with_context(|| {
format!(
"Failed to write encryption key to the local storage at '{encryption_key_path}'",
)
})?;
}
Ok(())
}
}
@@ -450,6 +466,7 @@ impl RemoteStorage for LocalFs {
key: &RemotePath,
_cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
// TODO: check encryption key
let target_file_path = key.with_base(&self.storage_root);
let metadata = file_metadata(&target_file_path).await?;
Ok(ListingObject {
@@ -461,34 +478,14 @@ impl RemoteStorage for LocalFs {
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let cancel = cancel.child_token();
let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
let mut op = std::pin::pin!(op);
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
let (res, timeout) = tokio::select! {
res = &mut op => (res, false),
_ = tokio::time::sleep(self.timeout) => {
cancel.cancel();
(op.await, true)
}
};
match res {
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
// we caused this cancel (or they happened simultaneously) -- swap it out to
// Timeout
Err(TimeoutOrCancel::Timeout.into())
}
res => res,
}
self.upload_with_encryption(data, data_size_bytes, to, metadata, None, cancel)
.await
}
async fn download(
@@ -506,6 +503,22 @@ impl RemoteStorage for LocalFs {
return Err(DownloadError::Unmodified);
}
let key = match fs::read(storage_encryption_key_path(&target_path)).await {
Ok(key) => Some(key),
Err(e) if e.kind() == ErrorKind::NotFound => None,
Err(e) => {
return Err(DownloadError::Other(
anyhow::anyhow!(e).context("cannot read encryption key"),
));
}
};
if key != opts.encryption_key {
return Err(DownloadError::Other(anyhow::anyhow!(
"encryption key mismatch"
)));
}
let mut file = fs::OpenOptions::new()
.read(true)
.open(&target_path)
@@ -551,12 +564,53 @@ impl RemoteStorage for LocalFs {
async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
let file_path = path.with_base(&self.storage_root);
match fs::remove_file(&file_path).await {
Ok(()) => Ok(()),
Ok(()) => {}
// The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
// > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) => Err(anyhow::anyhow!(e)),
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(e) => return Err(anyhow::anyhow!(e)),
};
fs::remove_file(&storage_metadata_path(&file_path))
.await
.ok();
fs::remove_file(&storage_encryption_key_path(&file_path))
.await
.ok();
Ok(())
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let cancel = cancel.child_token();
let op = self.upload0(data, data_size_bytes, to, metadata, encryption_key, &cancel);
let mut op = std::pin::pin!(op);
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
let (res, timeout) = tokio::select! {
res = &mut op => (res, false),
_ = tokio::time::sleep(self.timeout) => {
cancel.cancel();
(op.await, true)
}
};
match res {
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
// we caused this cancel (or they happened simultaneously) -- swap it out to
// Timeout
Err(TimeoutOrCancel::Timeout.into())
}
res => res,
}
}
@@ -591,6 +645,7 @@ impl RemoteStorage for LocalFs {
to_path = to_path
)
})?;
// TODO: copy metadata and encryption key
Ok(())
}
@@ -609,6 +664,10 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
path_with_suffix_extension(original_path, "metadata")
}
fn storage_encryption_key_path(original_path: &Utf8Path) -> Utf8PathBuf {
path_with_suffix_extension(original_path, "enc")
}
async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
let target_dir = match target_file_path.parent() {
Some(parent_dir) => parent_dir,

View File

@@ -66,7 +66,10 @@ struct GetObjectRequest {
key: String,
etag: Option<String>,
range: Option<String>,
/// Base64 encoded SSE-C key for server-side encryption.
sse_c_key: Option<Vec<u8>>,
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
@@ -257,6 +260,13 @@ impl S3Bucket {
builder = builder.if_none_match(etag);
}
if let Some(encryption_key) = request.sse_c_key {
builder = builder.sse_customer_algorithm("AES256");
builder = builder.sse_customer_key(base64::encode(&encryption_key));
builder = builder
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
}
let get_object = builder.send();
let get_object = tokio::select! {
@@ -693,12 +703,13 @@ impl RemoteStorage for S3Bucket {
})
}
async fn upload(
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Put;
@@ -709,7 +720,7 @@ impl RemoteStorage for S3Bucket {
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
let upload = self
let mut upload = self
.client
.put_object()
.bucket(self.bucket_name.clone())
@@ -717,8 +728,17 @@ impl RemoteStorage for S3Bucket {
.set_metadata(metadata.map(|m| m.0))
.set_storage_class(self.upload_storage_class.clone())
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream)
.send();
.body(bytes_stream);
if let Some(encryption_key) = encryption_key {
upload = upload.sse_customer_algorithm("AES256");
let base64_key = base64::encode(encryption_key);
upload = upload.sse_customer_key(&base64_key);
upload = upload
.sse_customer_key_md5(base64::encode(md5::compute(encryption_key).as_slice()));
}
let upload = upload.send();
let upload = tokio::time::timeout(self.timeout, upload);
@@ -742,6 +762,18 @@ impl RemoteStorage for S3Bucket {
}
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel)
.await
}
async fn copy(
&self,
from: &RemotePath,
@@ -801,6 +833,7 @@ impl RemoteStorage for S3Bucket {
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: opts.byte_range_header(),
sse_c_key: opts.encryption_key.clone(),
},
cancel,
)

View File

@@ -178,6 +178,19 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.download(from, opts, cancel).await
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_inner(path, true, cancel).await
}

View File

@@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
let timeout = std::time::Duration::from_secs(5);
@@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
{
let stream = ctx
@@ -555,6 +555,7 @@ async fn upload_large_enough_file(
client: &GenericRemoteStorage,
path: &RemotePath,
cancel: &CancellationToken,
encryption_key: Option<&[u8]>,
) -> usize {
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
let body = bytes::Bytes::from(vec![0u8; 1024]);
@@ -565,9 +566,54 @@ async fn upload_large_enough_file(
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
client
.upload(contents, len, path, None, cancel)
.upload_with_encryption(contents, len, path, None, encryption_key, cancel)
.await
.expect("upload succeeds");
len
}
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn encryption_works(ctx: &mut MaybeEnabledStorage) {
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
return;
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
.unwrap();
let key = rand::random::<[u8; 32]>();
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await;
{
let download = ctx
.client
.download(
&path,
&DownloadOpts::default().with_encryption_key(Some(&key)),
&cancel,
)
.await
.expect("should succeed");
let vec = download_to_vec(download).await.expect("should succeed");
assert_eq!(vec.len(), file_len);
}
{
// Download without encryption key should fail
let download = ctx
.client
.download(&path, &DownloadOpts::default(), &cancel)
.await;
assert!(download.is_err());
}
let cancel = CancellationToken::new();
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
}

View File

@@ -17,6 +17,7 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
base64.workspace = true
bit_field.workspace = true
bincode.workspace = true
byteorder.workspace = true
@@ -82,6 +83,7 @@ postgres_connection.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
remote_keys.workspace = true
storage_broker.workspace = true
tenant_size_model.workspace = true
http-utils.workspace = true

View File

@@ -45,6 +45,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
shard: ShardIndex::new(ShardNumber(1), ShardCount(2)),
generation: Generation::Valid(1),
file_size: 0,
encryption_key: None,
};
// Construct the (initial and uploaded) index with layer0.

View File

@@ -192,11 +192,12 @@ pub(crate) use download::{
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
list_remote_tenant_shards, list_remote_timelines,
};
use index::GcCompactionState;
pub(crate) use index::LayerFileMetadata;
use index::{EncryptionKey, EncryptionKeyId, EncryptionKeyPair, GcCompactionState, KeyVersion};
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;
use remote_keys::NaiveKms;
use remote_storage::{
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
};
@@ -367,6 +368,10 @@ pub(crate) struct RemoteTimelineClient {
config: std::sync::RwLock<RemoteTimelineClientConfig>,
cancel: CancellationToken,
kms_impl: Option<NaiveKms>,
key_repo: std::sync::Mutex<HashMap<EncryptionKeyId, EncryptionKeyPair>>,
}
impl Drop for RemoteTimelineClient {
@@ -411,6 +416,9 @@ impl RemoteTimelineClient {
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
cancel: CancellationToken::new(),
// TODO: make this configurable
kms_impl: Some(NaiveKms::new(tenant_shard_id.tenant_id.to_string())),
key_repo: std::sync::Mutex::new(HashMap::new()),
}
}
@@ -727,9 +735,43 @@ impl RemoteTimelineClient {
reason: "no need for a downloads gauge",
},
);
let key_pair = if let Some(ref key_id) = layer_metadata.encryption_key {
let wrapped_key = {
let mut queue = self.upload_queue.lock().unwrap();
let upload_queue = queue.initialized_mut().unwrap();
let encryption_key_pair =
upload_queue.dirty.keys.iter().find(|key| &key.id == key_id);
if let Some(encryption_key_pair) = encryption_key_pair {
// TODO: also check if we have uploaded the key yet; we should never use a key that is not persisted
encryption_key_pair.clone()
} else {
return Err(DownloadError::Other(anyhow::anyhow!(
"Encryption key pair not found in index_part.json"
)));
}
};
let Some(kms) = self.kms_impl.as_ref() else {
return Err(DownloadError::Other(anyhow::anyhow!(
"KMS not configured when downloading encrypted layer file"
)));
};
let plain_key = kms
.decrypt(&wrapped_key.key)
.context("failed to decrypt encryption key")
.map_err(DownloadError::Other)?;
Some(EncryptionKeyPair::new(
wrapped_key.id,
plain_key,
wrapped_key.key,
))
} else {
None
};
download::download_layer_file(
self.conf,
&self.storage_impl,
key_pair.as_ref(),
self.tenant_shard_id,
self.timeline_id,
layer_file_name,
@@ -1250,6 +1292,14 @@ impl RemoteTimelineClient {
upload_queue: &mut UploadQueueInitialized,
layer: ResidentLayer,
) {
let key_pair = {
if let Some(key_id) = layer.metadata().encryption_key {
let guard = self.key_repo.lock().unwrap();
Some(guard.get(&key_id).cloned().unwrap())
} else {
None
}
};
let metadata = layer.metadata();
upload_queue
@@ -1264,7 +1314,7 @@ impl RemoteTimelineClient {
"scheduled layer file upload {layer}",
);
let op = UploadOp::UploadLayer(layer, metadata, None);
let op = UploadOp::UploadLayer(layer, metadata, key_pair, None);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
@@ -1446,6 +1496,58 @@ impl RemoteTimelineClient {
upload_queue.queued_operations.push_back(op);
}
#[allow(dead_code)]
fn is_kms_enabled(&self) -> bool {
self.kms_impl.is_some()
}
pub(crate) fn schedule_generate_encryption_key(
self: &Arc<Self>,
) -> Result<Option<EncryptionKeyPair>, NotInitialized> {
let Some(kms_impl) = self.kms_impl.as_ref() else {
return Ok(None);
};
let plain_key = rand::random::<[u8; 32]>().to_vec(); // StdRng is cryptographically secure (?)
let wrapped_key = kms_impl.encrypt(&plain_key).unwrap();
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let last_key = upload_queue.dirty.keys.last();
let this_key_version = if let Some(last_key) = last_key {
let key_version = EncryptionKeyId {
version: last_key.id.version.next(),
generation: self.generation,
};
assert!(key_version > last_key.id); // ensure key version is strictly increasing; no dup key versions
key_version
} else {
EncryptionKeyId {
version: KeyVersion(1),
generation: self.generation,
}
};
let key_pair = EncryptionKeyPair {
id: this_key_version.clone(),
plain_key: plain_key.clone(),
wrapped_key,
};
upload_queue.dirty.keys.push(EncryptionKey {
key: plain_key,
id: this_key_version,
created_at: Utc::now().naive_utc(),
});
self.key_repo.lock().unwrap().insert(this_key_version, key_pair);
self.schedule_index_upload(upload_queue);
Ok(Some(key_pair))
}
/// Schedules a compaction update to the remote `index_part.json`.
///
/// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
@@ -1454,6 +1556,7 @@ impl RemoteTimelineClient {
compacted_from: &[Layer],
compacted_to: &[ResidentLayer],
) -> Result<(), NotInitialized> {
// Use the same key for all layers in a single compaction job
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -1715,6 +1818,7 @@ impl RemoteTimelineClient {
uploaded.local_path(),
&remote_path,
uploaded.metadata().file_size,
None, // TODO(chi): support encryption for those layer files uploaded using this interface
cancel,
)
.await
@@ -1757,6 +1861,8 @@ impl RemoteTimelineClient {
adopted_as.metadata().generation,
);
// TODO: support encryption for those layer files uploaded using this interface
backoff::retry(
|| async {
upload::copy_timeline_layer(
@@ -1977,7 +2083,7 @@ impl RemoteTimelineClient {
// Prepare upload.
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
UploadOp::UploadLayer(layer, meta, _, mode) => {
if upload_queue
.recently_deleted
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
@@ -2071,7 +2177,7 @@ impl RemoteTimelineClient {
// Assert that we don't modify a layer that's referenced by the current index.
if cfg!(debug_assertions) {
let modified = match &task.op {
UploadOp::UploadLayer(layer, layer_metadata, _) => {
UploadOp::UploadLayer(layer, layer_metadata, _, _) => {
vec![(layer.layer_desc().layer_name(), layer_metadata)]
}
UploadOp::Delete(delete) => {
@@ -2093,7 +2199,7 @@ impl RemoteTimelineClient {
}
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(layer, layer_metadata, mode) => {
UploadOp::UploadLayer(layer, layer_metadata, encryption_key_pair, mode) => {
// TODO: check if this mechanism can be removed now that can_bypass() performs
// conflict checks during scheduling.
if let Some(OpType::FlushDeletion) = mode {
@@ -2174,6 +2280,7 @@ impl RemoteTimelineClient {
local_path,
&remote_path,
layer_metadata.file_size,
encryption_key_pair.clone(),
&self.cancel,
)
.measure_remote_op(
@@ -2324,7 +2431,7 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);
let lsn_update = match task.op {
UploadOp::UploadLayer(_, _, _) => None,
UploadOp::UploadLayer(_, _, _, _) => None,
UploadOp::UploadMetadata { ref uploaded } => {
// the task id is reused as a monotonicity check for storing the "clean"
// IndexPart.
@@ -2403,7 +2510,7 @@ impl RemoteTimelineClient {
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
let res = match op {
UploadOp::UploadLayer(_, m, _) => (
UploadOp::UploadLayer(_, m, _, _) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
@@ -2787,6 +2894,10 @@ mod tests {
for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
if fname.ends_with(".metadata") || fname.ends_with(".enc") {
// ignore metadata and encryption key files; should use local_fs APIs instead in the future
continue;
}
found.push(String::from(fname));
}
found.sort();
@@ -2840,6 +2951,8 @@ mod tests {
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
cancel: CancellationToken::new(),
kms_impl: None,
key_repo: std::sync::Mutex::new(HashMap::new()),
})
}

View File

@@ -23,7 +23,7 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use utils::{backoff, pausable_failpoint};
use super::index::{IndexPart, LayerFileMetadata};
use super::index::{EncryptionKeyPair, IndexPart, LayerFileMetadata};
use super::manifest::TenantManifest;
use super::{
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
@@ -51,6 +51,7 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
pub async fn download_layer_file<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
key_pair: Option<&'a EncryptionKeyPair>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
layer_file_name: &'a LayerName,
@@ -86,7 +87,16 @@ pub async fn download_layer_file<'a>(
let bytes_amount = download_retry(
|| async {
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
download_object(
storage,
key_pair,
&remote_path,
&temp_file_path,
gate,
cancel,
ctx,
)
.await
},
&format!("download {remote_path:?}"),
cancel,
@@ -145,6 +155,7 @@ pub async fn download_layer_file<'a>(
/// The unlinking has _not_ been made durable.
async fn download_object(
storage: &GenericRemoteStorage,
encryption_key_pair: Option<&EncryptionKeyPair>,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
@@ -160,9 +171,12 @@ async fn download_object(
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
let mut opts = DownloadOpts::default();
if let Some(encryption_key_pair) = encryption_key_pair {
opts.encryption_key = Some(encryption_key_pair.plain_key.to_vec());
}
let download = storage.download(src_path, &opts, cancel).await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");

View File

@@ -10,6 +10,8 @@ use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::RelSizeMigration;
use pageserver_api::shard::ShardIndex;
use serde::{Deserialize, Serialize};
use serde_with::base64::Base64;
use serde_with::serde_as;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -114,6 +116,70 @@ pub struct IndexPart {
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
/// The encryption key used to encrypt the timeline layer files.
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) keys: Vec<EncryptionKey>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
pub struct KeyVersion(pub u32);
impl KeyVersion {
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
}
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
pub struct EncryptionKeyId {
pub version: KeyVersion,
pub generation: Generation,
}
#[derive(Clone)]
pub struct EncryptionKeyPair {
pub id: EncryptionKeyId,
pub plain_key: Vec<u8>,
pub wrapped_key: Vec<u8>,
}
impl EncryptionKeyPair {
pub fn new(id: EncryptionKeyId, plain_key: Vec<u8>, wrapped_key: Vec<u8>) -> Self {
Self {
id,
plain_key,
wrapped_key,
}
}
}
impl std::fmt::Debug for EncryptionKeyPair {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let display =
base64::display::Base64Display::with_config(&self.wrapped_key, base64::STANDARD);
struct DisplayAsDebug<T: std::fmt::Display>(T);
impl<T: std::fmt::Display> std::fmt::Debug for DisplayAsDebug<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
f.debug_struct("EncryptionKeyPair")
.field("id", &self.id)
.field("plain_key", &"<REDACTED>")
.field("wrapped_key", &DisplayAsDebug(&display))
.finish()
}
}
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct EncryptionKey {
#[serde_as(as = "Base64")]
pub key: Vec<u8>,
pub id: EncryptionKeyId,
pub created_at: NaiveDateTime,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
@@ -142,10 +208,12 @@ impl IndexPart {
/// - 12: +l2_lsn
/// - 13: +gc_compaction
/// - 14: +marked_invisible_at
const LATEST_VERSION: usize = 14;
/// - 15: +keys and encryption_key in layer_metadata
const LATEST_VERSION: usize = 15;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
pub const KNOWN_VERSIONS: &'static [usize] =
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -165,6 +233,7 @@ impl IndexPart {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
}
}
@@ -205,14 +274,16 @@ impl IndexPart {
/// Check for invariants in the index: this is useful when uploading an index to ensure that if
/// we encounter a bug, we do not persist buggy metadata.
pub(crate) fn validate(&self) -> Result<(), String> {
if self.import_pgdata.is_none()
&& self.metadata.ancestor_timeline().is_none()
&& self.layer_metadata.is_empty()
{
// Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
// always have at least one layer.
return Err("Index has no ancestor and no layers".to_string());
}
// We have to disable this check: we might need to upload an empty index part with new keys, or new `reldirv2` flag.
// if self.import_pgdata.is_none()
// && self.metadata.ancestor_timeline().is_none()
// && self.layer_metadata.is_empty()
// {
// // Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
// // always have at least one layer.
// return Err("Index has no ancestor and no layers".to_string());
// }
Ok(())
}
@@ -222,7 +293,7 @@ impl IndexPart {
///
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LayerFileMetadata {
pub file_size: u64,
@@ -233,6 +304,9 @@ pub struct LayerFileMetadata {
#[serde(default = "ShardIndex::unsharded")]
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
pub shard: ShardIndex,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub encryption_key: Option<EncryptionKeyId>,
}
impl LayerFileMetadata {
@@ -241,6 +315,7 @@ impl LayerFileMetadata {
file_size,
generation,
shard,
encryption_key: None,
}
}
/// Helper to get both generation and file size in a tuple
@@ -453,14 +528,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -475,6 +552,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -502,14 +580,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -524,6 +604,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -552,14 +633,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -574,6 +657,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -627,6 +711,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -653,14 +738,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -675,6 +762,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -703,11 +791,13 @@ mod tests {
file_size: 23289856,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
file_size: 1015808,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
@@ -726,6 +816,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -756,14 +847,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -782,6 +875,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -815,12 +909,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -843,6 +939,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -877,12 +974,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -905,6 +1004,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -941,12 +1041,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -972,6 +1074,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1017,12 +1120,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1052,6 +1157,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1098,12 +1204,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1133,6 +1241,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1183,12 +1292,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1220,6 +1331,7 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1271,12 +1383,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1308,6 +1422,139 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v15_keys_are_parsed() {
let example = r#"{
"version": 15,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000, "encryption_key": { "version": 1, "generation": 5 } },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001, "encryption_key": { "version": 2, "generation": 6 } }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
},
"import_pgdata": {
"V1": {
"Done": {
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
"started_at": "2024-11-13T09:23:42.123",
"finished_at": "2024-11-13T09:42:23.123"
}
}
},
"rel_size_migration": "legacy",
"l2_lsn": "0/16960E8",
"gc_compaction": {
"last_completed_lsn": "0/16960E8"
},
"marked_invisible_at": "2023-07-31T09:00:00.123",
"keys": [
{
"key": "dGVzdF9rZXk=",
"id": {
"version": 1,
"generation": 5
},
"created_at": "2024-07-19T09:00:00.123"
},
{
"key": "dGVzdF9rZXlfMg==",
"id": {
"version": 2,
"generation": 6
},
"created_at": "2024-07-19T10:00:00.123"
}
]
}"#;
let expected = IndexPart {
version: 15,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: Some(EncryptionKeyId {
version: KeyVersion(1),
generation: Generation::Valid(5),
}),
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: Some(EncryptionKeyId {
version: KeyVersion(2),
generation: Generation::Valid(6),
}),
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
keys: vec![
EncryptionKey {
key: "test_key".as_bytes().to_vec(),
id: EncryptionKeyId {
version: KeyVersion(1),
generation: Generation::Valid(5),
},
created_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
},
EncryptionKey {
key: "test_key_2".as_bytes().to_vec(),
id: EncryptionKeyId {
version: KeyVersion(2),
generation: Generation::Valid(6),
},
created_at: parse_naive_datetime("2024-07-19T10:00:00.123000000"),
}
],
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -17,7 +17,7 @@ use utils::id::{TenantId, TimelineId};
use utils::{backoff, pausable_failpoint};
use super::Generation;
use super::index::IndexPart;
use super::index::{EncryptionKeyPair, IndexPart};
use super::manifest::TenantManifest;
use crate::tenant::remote_timeline_client::{
remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
@@ -101,6 +101,7 @@ pub(super) async fn upload_timeline_layer<'a>(
local_path: &'a Utf8Path,
remote_path: &'a RemotePath,
metadata_size: u64,
encryption_key_pair: Option<EncryptionKeyPair>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| {
@@ -144,7 +145,14 @@ pub(super) async fn upload_timeline_layer<'a>(
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
storage
.upload(reader, fs_size, remote_path, None, cancel)
.upload_with_encryption(
reader,
fs_size,
remote_path,
None,
encryption_key_pair.as_ref().map(|k| k.plain_key.as_slice()),
cancel,
)
.await
.with_context(|| format!("upload layer from local path '{local_path}'"))
}

View File

@@ -1310,6 +1310,7 @@ impl<'a> TenantDownloader<'a> {
let downloaded_bytes = download_layer_file(
self.conf,
self.remote_storage,
None, // TODO: add encryption key pair
*tenant_shard_id,
*timeline_id,
&layer.name,

View File

@@ -4864,6 +4864,7 @@ impl Timeline {
else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
// FIXME: even though we have a single image and single delta layer assumption
// we push them to vec
@@ -6932,9 +6933,7 @@ impl Timeline {
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client
.schedule_layer_file_upload(image_layer)
.unwrap();
self.remote_client.schedule_layer_file_upload(image_layer)?;
Ok(())
}
@@ -6995,9 +6994,7 @@ impl Timeline {
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client
.schedule_layer_file_upload(delta_layer)
.unwrap();
self.remote_client.schedule_layer_file_upload(delta_layer)?;
Ok(())
}

View File

@@ -1291,6 +1291,7 @@ impl Timeline {
.parts
.extend(sparse_partitioning.into_dense().parts);
// 3. Create new image layers for partitions that have been modified "enough".
let (image_layers, outcome) = self
.create_image_layers(

View File

@@ -244,7 +244,8 @@ impl RemoteStorageWrapper {
kind: DownloadKind::Large,
etag: None,
byte_start: Bound::Included(start_inclusive),
byte_end: Bound::Excluded(end_exclusive)
byte_end: Bound::Excluded(end_exclusive),
encryption_key: None,
},
&self.cancel)
.await?;

View File

@@ -9,6 +9,7 @@ use tracing::info;
use utils::generation::Generation;
use utils::lsn::{AtomicLsn, Lsn};
use super::remote_timeline_client::index::EncryptionKeyPair;
use super::remote_timeline_client::is_same_remote_layer_path;
use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
use crate::tenant::metadata::TimelineMetadata;
@@ -245,7 +246,7 @@ impl UploadQueueInitialized {
pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
self.inprogress_tasks
.iter()
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _, _)))
.count()
}
@@ -461,7 +462,12 @@ pub struct Delete {
#[derive(Clone, Debug)]
pub enum UploadOp {
/// Upload a layer file. The last field indicates the last operation for thie file.
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
UploadLayer(
ResidentLayer,
LayerFileMetadata,
Option<EncryptionKeyPair>,
Option<OpType>,
),
/// Upload a index_part.json file
UploadMetadata {
@@ -483,7 +489,7 @@ pub enum UploadOp {
impl std::fmt::Display for UploadOp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
UploadOp::UploadLayer(layer, metadata, mode) => {
UploadOp::UploadLayer(layer, metadata, _, mode) => {
write!(
f,
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
@@ -517,13 +523,13 @@ impl UploadOp {
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
// Uploads and deletes can bypass each other unless they're for the same file.
(UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
(UploadOp::UploadLayer(a, ameta, _, _), UploadOp::UploadLayer(b, bmeta, _, _)) => {
let aname = &a.layer_desc().layer_name();
let bname = &b.layer_desc().layer_name();
!is_same_remote_layer_path(aname, ameta, bname, bmeta)
}
(UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::Delete(d))
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _, _)) => {
d.layers.iter().all(|(dname, dmeta)| {
!is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
})
@@ -539,8 +545,8 @@ impl UploadOp {
// Similarly, index uploads can bypass uploads and deletes as long as neither the
// uploaded index nor the active index references the file (the latter would be
// incorrect use by the caller).
(UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _, _)) => {
let uname = u.layer_desc().layer_name();
!i.references(&uname, umeta) && !index.references(&uname, umeta)
}
@@ -577,7 +583,7 @@ mod tests {
fn assert_same_op(a: &UploadOp, b: &UploadOp) {
use UploadOp::*;
match (a, b) {
(UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
(UploadLayer(a, ameta, _, atype), UploadLayer(b, bmeta, _, btype)) => {
assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
assert_eq!(ameta, bmeta);
assert_eq!(atype, btype);
@@ -641,6 +647,7 @@ mod tests {
generation: timeline.generation,
shard: timeline.get_shard_index(),
file_size: size as u64,
encryption_key: None,
};
make_layer_with_metadata(timeline, name, metadata)
}
@@ -710,7 +717,7 @@ mod tests {
// Enqueue non-conflicting upload, delete, and index before and after a barrier.
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
}),
@@ -718,7 +725,7 @@ mod tests {
uploaded: index.clone(),
},
UploadOp::Barrier(barrier),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
}),
@@ -843,9 +850,9 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None, None),
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None, None),
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None, None),
];
queue.queued_operations.extend(ops.clone());
@@ -882,14 +889,14 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![
(layer0.layer_desc().layer_name(), layer0.metadata()),
(layer1.layer_desc().layer_name(), layer1.metadata()),
],
}),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
];
queue.queued_operations.extend(ops.clone());
@@ -938,15 +945,15 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![
(layer0.layer_desc().layer_name(), layer0.metadata()),
(layer1.layer_desc().layer_name(), layer1.metadata()),
],
}),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
}),
@@ -984,9 +991,9 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
];
queue.queued_operations.extend(ops.clone());
@@ -1061,15 +1068,15 @@ mod tests {
let index2 = index_with(&index1, &layer2);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadMetadata {
uploaded: index0.clone(),
},
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadMetadata {
uploaded: index1.clone(),
},
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadMetadata {
uploaded: index2.clone(),
},
@@ -1128,7 +1135,7 @@ mod tests {
let ops = [
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
@@ -1177,7 +1184,7 @@ mod tests {
let ops = [
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
@@ -1187,7 +1194,7 @@ mod tests {
uploaded: index_deref.clone(),
},
// Replace and reference the layer.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::UploadMetadata {
uploaded: index_ref.clone(),
},
@@ -1235,7 +1242,7 @@ mod tests {
// Enqueue non-conflicting upload, delete, and index before and after a shutdown.
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
}),
@@ -1243,7 +1250,7 @@ mod tests {
uploaded: index.clone(),
},
UploadOp::Shutdown,
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
}),
@@ -1305,10 +1312,10 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None, None),
];
queue.queued_operations.extend(ops.clone());
@@ -1359,7 +1366,7 @@ mod tests {
.layer_metadata
.insert(layer.layer_desc().layer_name(), layer.metadata());
vec![
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::Delete(Delete {
layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
}),
@@ -1378,6 +1385,7 @@ mod tests {
shard,
generation: Generation::Valid(generation),
file_size: 0,
encryption_key: None,
};
make_layer_with_metadata(&tli, name, metadata)
};