mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-24 05:40:36 +00:00
Compare commits
7 Commits
release-pr
...
skyzh/uplo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c32c26d741 | ||
|
|
6da5c189b5 | ||
|
|
66c01e46ca | ||
|
|
94fe9d00f3 | ||
|
|
dc19960cbf | ||
|
|
d3b654f6db | ||
|
|
4a898cfa2c |
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -4251,6 +4251,7 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"base64 0.13.1",
|
||||
"bincode",
|
||||
"bit_field",
|
||||
"byteorder",
|
||||
@@ -4298,6 +4299,7 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"range-set-blaze",
|
||||
"regex",
|
||||
"remote_keys",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rpds",
|
||||
@@ -5503,6 +5505,16 @@ version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
|
||||
|
||||
[[package]]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"rand 0.8.5",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remote_storage"
|
||||
version = "0.1.0"
|
||||
@@ -5518,6 +5530,7 @@ dependencies = [
|
||||
"azure_identity",
|
||||
"azure_storage",
|
||||
"azure_storage_blobs",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
@@ -5528,6 +5541,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper 1.4.1",
|
||||
"itertools 0.10.5",
|
||||
"md5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -30,6 +30,7 @@ members = [
|
||||
"libs/tenant_size_model",
|
||||
"libs/metrics",
|
||||
"libs/postgres_connection",
|
||||
"libs/remote_keys",
|
||||
"libs/remote_storage",
|
||||
"libs/tracing-utils",
|
||||
"libs/postgres_ffi/wal_craft",
|
||||
@@ -255,6 +256,7 @@ postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
postgres_initdb = { path = "./libs/postgres_initdb" }
|
||||
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
|
||||
remote_keys = { version = "0.1", path = "./libs/remote_keys/" }
|
||||
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
|
||||
13
libs/remote_keys/Cargo.toml
Normal file
13
libs/remote_keys/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand.workspace = true
|
||||
42
libs/remote_keys/src/lib.rs
Normal file
42
libs/remote_keys/src/lib.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
//! A module that provides a KMS implementation that generates and unwraps the keys.
|
||||
//!
|
||||
|
||||
/// A KMS implementation that does static wrapping and unwrapping of the keys.
|
||||
pub struct NaiveKms {
|
||||
account_id: String,
|
||||
}
|
||||
|
||||
impl NaiveKms {
|
||||
pub fn new(account_id: String) -> Self {
|
||||
Self { account_id }
|
||||
}
|
||||
|
||||
pub fn encrypt(&self, plain: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let wrapped = [self.account_id.as_bytes(), "-wrapped-".as_bytes(), plain].concat();
|
||||
Ok(wrapped)
|
||||
}
|
||||
|
||||
pub fn decrypt(&self, wrapped: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let Some(wrapped) = wrapped.strip_prefix(self.account_id.as_bytes()) else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
let Some(plain) = wrapped.strip_prefix(b"-wrapped-") else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
Ok(plain.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_generate_key() {
|
||||
let kms = NaiveKms::new("test-tenant".to_string());
|
||||
let data = rand::random::<[u8; 32]>().to_vec();
|
||||
let encrypted = kms.encrypt(&data).unwrap();
|
||||
let decrypted = kms.decrypt(&encrypted).unwrap();
|
||||
assert_eq!(data, decrypted);
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ aws-smithy-async.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
@@ -27,6 +28,7 @@ tokio-util = { workspace = true, features = ["compat"] }
|
||||
toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
scopeguard.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
utils = { path = "../utils", default-features = false }
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -550,6 +550,19 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
self.download_for_builder(builder, timeout, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_objects(std::array::from_ref(path), cancel)
|
||||
.await
|
||||
|
||||
@@ -190,6 +190,8 @@ pub struct DownloadOpts {
|
||||
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
|
||||
/// for layer files
|
||||
pub kind: DownloadKind,
|
||||
/// The encryption key to use for the download.
|
||||
pub encryption_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
pub enum DownloadKind {
|
||||
@@ -204,6 +206,7 @@ impl Default for DownloadOpts {
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
kind: DownloadKind::Large,
|
||||
encryption_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +244,15 @@ impl DownloadOpts {
|
||||
None => format!("bytes={start}-"),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_encryption_key(mut self, encryption_key: Option<impl AsRef<[u8]>>) -> Self {
|
||||
self.encryption_key = encryption_key.map(|k| k.as_ref().to_vec());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn encryption_key(&self) -> Option<&[u8]> {
|
||||
self.encryption_key.as_deref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -331,6 +343,19 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Same as upload, but with remote encryption if the backend supports it (e.g. SSE-C on AWS).
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
// S3 PUT request requires the content length to be specified,
|
||||
// otherwise it starts to fail with the concurrent connection count increasing.
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Delete a single path from remote storage.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
@@ -615,6 +640,63 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AwsS3(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AzureBlob(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::Unreliable(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
|
||||
@@ -198,6 +198,10 @@ impl LocalFs {
|
||||
let mut entries = cur_folder.read_dir_utf8()?;
|
||||
while let Some(Ok(entry)) = entries.next() {
|
||||
let file_name = entry.file_name();
|
||||
if file_name.ends_with(".metadata") || file_name.ends_with(".enc") {
|
||||
// ignore metadata and encryption key files
|
||||
continue;
|
||||
}
|
||||
let full_file_name = cur_folder.join(file_name);
|
||||
if full_file_name.as_str().starts_with(prefix) {
|
||||
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
|
||||
@@ -218,6 +222,7 @@ impl LocalFs {
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
enctyption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let target_file_path = to.with_base(&self.storage_root);
|
||||
@@ -306,6 +311,8 @@ impl LocalFs {
|
||||
)
|
||||
})?;
|
||||
|
||||
// TODO: we might need to make the following writes atomic with the file write operation above
|
||||
|
||||
if let Some(storage_metadata) = metadata {
|
||||
// FIXME: we must not be using metadata much, since this would forget the old metadata
|
||||
// for new writes? or perhaps metadata is sticky; could consider removing if it's never
|
||||
@@ -324,6 +331,15 @@ impl LocalFs {
|
||||
})?;
|
||||
}
|
||||
|
||||
if let Some(encryption_key) = enctyption_key {
|
||||
let encryption_key_path = storage_encryption_key_path(&target_file_path);
|
||||
fs::write(&encryption_key_path, encryption_key).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to write encryption key to the local storage at '{encryption_key_path}'",
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -450,6 +466,7 @@ impl RemoteStorage for LocalFs {
|
||||
key: &RemotePath,
|
||||
_cancel: &CancellationToken,
|
||||
) -> Result<ListingObject, DownloadError> {
|
||||
// TODO: check encryption key
|
||||
let target_file_path = key.with_base(&self.storage_root);
|
||||
let metadata = file_metadata(&target_file_path).await?;
|
||||
Ok(ListingObject {
|
||||
@@ -461,34 +478,14 @@ impl RemoteStorage for LocalFs {
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
|
||||
let mut op = std::pin::pin!(op);
|
||||
|
||||
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
|
||||
let (res, timeout) = tokio::select! {
|
||||
res = &mut op => (res, false),
|
||||
_ = tokio::time::sleep(self.timeout) => {
|
||||
cancel.cancel();
|
||||
(op.await, true)
|
||||
}
|
||||
};
|
||||
|
||||
match res {
|
||||
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
|
||||
// we caused this cancel (or they happened simultaneously) -- swap it out to
|
||||
// Timeout
|
||||
Err(TimeoutOrCancel::Timeout.into())
|
||||
}
|
||||
res => res,
|
||||
}
|
||||
self.upload_with_encryption(data, data_size_bytes, to, metadata, None, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download(
|
||||
@@ -506,6 +503,22 @@ impl RemoteStorage for LocalFs {
|
||||
return Err(DownloadError::Unmodified);
|
||||
}
|
||||
|
||||
let key = match fs::read(storage_encryption_key_path(&target_path)).await {
|
||||
Ok(key) => Some(key),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => None,
|
||||
Err(e) => {
|
||||
return Err(DownloadError::Other(
|
||||
anyhow::anyhow!(e).context("cannot read encryption key"),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if key != opts.encryption_key {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"encryption key mismatch"
|
||||
)));
|
||||
}
|
||||
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&target_path)
|
||||
@@ -551,12 +564,53 @@ impl RemoteStorage for LocalFs {
|
||||
async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
let file_path = path.with_base(&self.storage_root);
|
||||
match fs::remove_file(&file_path).await {
|
||||
Ok(()) => Ok(()),
|
||||
Ok(()) => {}
|
||||
// The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
|
||||
// > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
|
||||
Err(e) => Err(anyhow::anyhow!(e)),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(anyhow::anyhow!(e)),
|
||||
};
|
||||
fs::remove_file(&storage_metadata_path(&file_path))
|
||||
.await
|
||||
.ok();
|
||||
fs::remove_file(&storage_encryption_key_path(&file_path))
|
||||
.await
|
||||
.ok();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
let op = self.upload0(data, data_size_bytes, to, metadata, encryption_key, &cancel);
|
||||
let mut op = std::pin::pin!(op);
|
||||
|
||||
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
|
||||
let (res, timeout) = tokio::select! {
|
||||
res = &mut op => (res, false),
|
||||
_ = tokio::time::sleep(self.timeout) => {
|
||||
cancel.cancel();
|
||||
(op.await, true)
|
||||
}
|
||||
};
|
||||
|
||||
match res {
|
||||
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
|
||||
// we caused this cancel (or they happened simultaneously) -- swap it out to
|
||||
// Timeout
|
||||
Err(TimeoutOrCancel::Timeout.into())
|
||||
}
|
||||
res => res,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -591,6 +645,7 @@ impl RemoteStorage for LocalFs {
|
||||
to_path = to_path
|
||||
)
|
||||
})?;
|
||||
// TODO: copy metadata and encryption key
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -609,6 +664,10 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
|
||||
path_with_suffix_extension(original_path, "metadata")
|
||||
}
|
||||
|
||||
fn storage_encryption_key_path(original_path: &Utf8Path) -> Utf8PathBuf {
|
||||
path_with_suffix_extension(original_path, "enc")
|
||||
}
|
||||
|
||||
async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
let target_dir = match target_file_path.parent() {
|
||||
Some(parent_dir) => parent_dir,
|
||||
|
||||
@@ -66,7 +66,10 @@ struct GetObjectRequest {
|
||||
key: String,
|
||||
etag: Option<String>,
|
||||
range: Option<String>,
|
||||
/// Base64 encoded SSE-C key for server-side encryption.
|
||||
sse_c_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
|
||||
@@ -257,6 +260,13 @@ impl S3Bucket {
|
||||
builder = builder.if_none_match(etag);
|
||||
}
|
||||
|
||||
if let Some(encryption_key) = request.sse_c_key {
|
||||
builder = builder.sse_customer_algorithm("AES256");
|
||||
builder = builder.sse_customer_key(base64::encode(&encryption_key));
|
||||
builder = builder
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let get_object = builder.send();
|
||||
|
||||
let get_object = tokio::select! {
|
||||
@@ -693,12 +703,13 @@ impl RemoteStorage for S3Bucket {
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
from_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Put;
|
||||
@@ -709,7 +720,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
|
||||
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
|
||||
|
||||
let upload = self
|
||||
let mut upload = self
|
||||
.client
|
||||
.put_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
@@ -717,8 +728,17 @@ impl RemoteStorage for S3Bucket {
|
||||
.set_metadata(metadata.map(|m| m.0))
|
||||
.set_storage_class(self.upload_storage_class.clone())
|
||||
.content_length(from_size_bytes.try_into()?)
|
||||
.body(bytes_stream)
|
||||
.send();
|
||||
.body(bytes_stream);
|
||||
|
||||
if let Some(encryption_key) = encryption_key {
|
||||
upload = upload.sse_customer_algorithm("AES256");
|
||||
let base64_key = base64::encode(encryption_key);
|
||||
upload = upload.sse_customer_key(&base64_key);
|
||||
upload = upload
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let upload = upload.send();
|
||||
|
||||
let upload = tokio::time::timeout(self.timeout, upload);
|
||||
|
||||
@@ -742,6 +762,18 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
@@ -801,6 +833,7 @@ impl RemoteStorage for S3Bucket {
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: opts.byte_range_header(),
|
||||
sse_c_key: opts.encryption_key.clone(),
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
|
||||
@@ -178,6 +178,19 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.download(from, opts, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_inner(path, true, cancel).await
|
||||
}
|
||||
|
||||
@@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
|
||||
@@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
{
|
||||
let stream = ctx
|
||||
@@ -555,6 +555,7 @@ async fn upload_large_enough_file(
|
||||
client: &GenericRemoteStorage,
|
||||
path: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
encryption_key: Option<&[u8]>,
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
@@ -565,9 +566,54 @@ async fn upload_large_enough_file(
|
||||
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
|
||||
|
||||
client
|
||||
.upload(contents, len, path, None, cancel)
|
||||
.upload_with_encryption(contents, len, path, None, encryption_key, cancel)
|
||||
.await
|
||||
.expect("upload succeeds");
|
||||
|
||||
len
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn encryption_works(ctx: &mut MaybeEnabledStorage) {
|
||||
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
|
||||
return;
|
||||
};
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let path = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let key = rand::random::<[u8; 32]>();
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await;
|
||||
|
||||
{
|
||||
let download = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts::default().with_encryption_key(Some(&key)),
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.expect("should succeed");
|
||||
let vec = download_to_vec(download).await.expect("should succeed");
|
||||
assert_eq!(vec.len(), file_len);
|
||||
}
|
||||
|
||||
{
|
||||
// Download without encryption key should fail
|
||||
let download = ctx
|
||||
.client
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await;
|
||||
assert!(download.is_err());
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
base64.workspace = true
|
||||
bit_field.workspace = true
|
||||
bincode.workspace = true
|
||||
byteorder.workspace = true
|
||||
@@ -82,6 +83,7 @@ postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
remote_keys.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
http-utils.workspace = true
|
||||
|
||||
@@ -45,6 +45,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
|
||||
shard: ShardIndex::new(ShardNumber(1), ShardCount(2)),
|
||||
generation: Generation::Valid(1),
|
||||
file_size: 0,
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
// Construct the (initial and uploaded) index with layer0.
|
||||
|
||||
@@ -192,11 +192,12 @@ pub(crate) use download::{
|
||||
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
|
||||
list_remote_tenant_shards, list_remote_timelines,
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
use index::{EncryptionKey, EncryptionKeyId, EncryptionKeyPair, GcCompactionState, KeyVersion};
|
||||
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use regex::Regex;
|
||||
use remote_keys::NaiveKms;
|
||||
use remote_storage::{
|
||||
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
|
||||
};
|
||||
@@ -367,6 +368,10 @@ pub(crate) struct RemoteTimelineClient {
|
||||
config: std::sync::RwLock<RemoteTimelineClientConfig>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
|
||||
kms_impl: Option<NaiveKms>,
|
||||
|
||||
key_repo: std::sync::Mutex<HashMap<EncryptionKeyId, EncryptionKeyPair>>,
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClient {
|
||||
@@ -411,6 +416,9 @@ impl RemoteTimelineClient {
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
// TODO: make this configurable
|
||||
kms_impl: Some(NaiveKms::new(tenant_shard_id.tenant_id.to_string())),
|
||||
key_repo: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,9 +735,43 @@ impl RemoteTimelineClient {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
let key_pair = if let Some(ref key_id) = layer_metadata.encryption_key {
|
||||
let wrapped_key = {
|
||||
let mut queue = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = queue.initialized_mut().unwrap();
|
||||
let encryption_key_pair =
|
||||
upload_queue.dirty.keys.iter().find(|key| &key.id == key_id);
|
||||
if let Some(encryption_key_pair) = encryption_key_pair {
|
||||
// TODO: also check if we have uploaded the key yet; we should never use a key that is not persisted
|
||||
encryption_key_pair.clone()
|
||||
} else {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Encryption key pair not found in index_part.json"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let Some(kms) = self.kms_impl.as_ref() else {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"KMS not configured when downloading encrypted layer file"
|
||||
)));
|
||||
};
|
||||
let plain_key = kms
|
||||
.decrypt(&wrapped_key.key)
|
||||
.context("failed to decrypt encryption key")
|
||||
.map_err(DownloadError::Other)?;
|
||||
Some(EncryptionKeyPair::new(
|
||||
wrapped_key.id,
|
||||
plain_key,
|
||||
wrapped_key.key,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
key_pair.as_ref(),
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
layer_file_name,
|
||||
@@ -1250,6 +1292,14 @@ impl RemoteTimelineClient {
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
layer: ResidentLayer,
|
||||
) {
|
||||
let key_pair = {
|
||||
if let Some(key_id) = layer.metadata().encryption_key {
|
||||
let guard = self.key_repo.lock().unwrap();
|
||||
Some(guard.get(&key_id).cloned().unwrap())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
let metadata = layer.metadata();
|
||||
|
||||
upload_queue
|
||||
@@ -1264,7 +1314,7 @@ impl RemoteTimelineClient {
|
||||
"scheduled layer file upload {layer}",
|
||||
);
|
||||
|
||||
let op = UploadOp::UploadLayer(layer, metadata, None);
|
||||
let op = UploadOp::UploadLayer(layer, metadata, key_pair, None);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
@@ -1446,6 +1496,58 @@ impl RemoteTimelineClient {
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn is_kms_enabled(&self) -> bool {
|
||||
self.kms_impl.is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_generate_encryption_key(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<Option<EncryptionKeyPair>, NotInitialized> {
|
||||
let Some(kms_impl) = self.kms_impl.as_ref() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let plain_key = rand::random::<[u8; 32]>().to_vec(); // StdRng is cryptographically secure (?)
|
||||
let wrapped_key = kms_impl.encrypt(&plain_key).unwrap();
|
||||
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let last_key = upload_queue.dirty.keys.last();
|
||||
let this_key_version = if let Some(last_key) = last_key {
|
||||
let key_version = EncryptionKeyId {
|
||||
version: last_key.id.version.next(),
|
||||
generation: self.generation,
|
||||
};
|
||||
assert!(key_version > last_key.id); // ensure key version is strictly increasing; no dup key versions
|
||||
key_version
|
||||
} else {
|
||||
EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: self.generation,
|
||||
}
|
||||
};
|
||||
|
||||
let key_pair = EncryptionKeyPair {
|
||||
id: this_key_version.clone(),
|
||||
plain_key: plain_key.clone(),
|
||||
wrapped_key,
|
||||
};
|
||||
|
||||
upload_queue.dirty.keys.push(EncryptionKey {
|
||||
key: plain_key,
|
||||
id: this_key_version,
|
||||
created_at: Utc::now().naive_utc(),
|
||||
});
|
||||
|
||||
self.key_repo.lock().unwrap().insert(this_key_version, key_pair);
|
||||
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Ok(Some(key_pair))
|
||||
}
|
||||
|
||||
/// Schedules a compaction update to the remote `index_part.json`.
|
||||
///
|
||||
/// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
|
||||
@@ -1454,6 +1556,7 @@ impl RemoteTimelineClient {
|
||||
compacted_from: &[Layer],
|
||||
compacted_to: &[ResidentLayer],
|
||||
) -> Result<(), NotInitialized> {
|
||||
// Use the same key for all layers in a single compaction job
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -1715,6 +1818,7 @@ impl RemoteTimelineClient {
|
||||
uploaded.local_path(),
|
||||
&remote_path,
|
||||
uploaded.metadata().file_size,
|
||||
None, // TODO(chi): support encryption for those layer files uploaded using this interface
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
@@ -1757,6 +1861,8 @@ impl RemoteTimelineClient {
|
||||
adopted_as.metadata().generation,
|
||||
);
|
||||
|
||||
// TODO: support encryption for those layer files uploaded using this interface
|
||||
|
||||
backoff::retry(
|
||||
|| async {
|
||||
upload::copy_timeline_layer(
|
||||
@@ -1977,7 +2083,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Prepare upload.
|
||||
match &mut next_op {
|
||||
UploadOp::UploadLayer(layer, meta, mode) => {
|
||||
UploadOp::UploadLayer(layer, meta, _, mode) => {
|
||||
if upload_queue
|
||||
.recently_deleted
|
||||
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
|
||||
@@ -2071,7 +2177,7 @@ impl RemoteTimelineClient {
|
||||
// Assert that we don't modify a layer that's referenced by the current index.
|
||||
if cfg!(debug_assertions) {
|
||||
let modified = match &task.op {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, _) => {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, _, _) => {
|
||||
vec![(layer.layer_desc().layer_name(), layer_metadata)]
|
||||
}
|
||||
UploadOp::Delete(delete) => {
|
||||
@@ -2093,7 +2199,7 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, mode) => {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, encryption_key_pair, mode) => {
|
||||
// TODO: check if this mechanism can be removed now that can_bypass() performs
|
||||
// conflict checks during scheduling.
|
||||
if let Some(OpType::FlushDeletion) = mode {
|
||||
@@ -2174,6 +2280,7 @@ impl RemoteTimelineClient {
|
||||
local_path,
|
||||
&remote_path,
|
||||
layer_metadata.file_size,
|
||||
encryption_key_pair.clone(),
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
@@ -2324,7 +2431,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.inprogress_tasks.remove(&task.task_id);
|
||||
|
||||
let lsn_update = match task.op {
|
||||
UploadOp::UploadLayer(_, _, _) => None,
|
||||
UploadOp::UploadLayer(_, _, _, _) => None,
|
||||
UploadOp::UploadMetadata { ref uploaded } => {
|
||||
// the task id is reused as a monotonicity check for storing the "clean"
|
||||
// IndexPart.
|
||||
@@ -2403,7 +2510,7 @@ impl RemoteTimelineClient {
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, m, _) => (
|
||||
UploadOp::UploadLayer(_, m, _, _) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
|
||||
@@ -2787,6 +2894,10 @@ mod tests {
|
||||
for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
|
||||
let entry_name = entry.file_name();
|
||||
let fname = entry_name.to_str().unwrap();
|
||||
if fname.ends_with(".metadata") || fname.ends_with(".enc") {
|
||||
// ignore metadata and encryption key files; should use local_fs APIs instead in the future
|
||||
continue;
|
||||
}
|
||||
found.push(String::from(fname));
|
||||
}
|
||||
found.sort();
|
||||
@@ -2840,6 +2951,8 @@ mod tests {
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
kms_impl: None,
|
||||
key_repo: std::sync::Mutex::new(HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
|
||||
use super::index::{IndexPart, LayerFileMetadata};
|
||||
use super::index::{EncryptionKeyPair, IndexPart, LayerFileMetadata};
|
||||
use super::manifest::TenantManifest;
|
||||
use super::{
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
|
||||
@@ -51,6 +51,7 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
|
||||
pub async fn download_layer_file<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
key_pair: Option<&'a EncryptionKeyPair>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
layer_file_name: &'a LayerName,
|
||||
@@ -86,7 +87,16 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let bytes_amount = download_retry(
|
||||
|| async {
|
||||
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
|
||||
download_object(
|
||||
storage,
|
||||
key_pair,
|
||||
&remote_path,
|
||||
&temp_file_path,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
@@ -145,6 +155,7 @@ pub async fn download_layer_file<'a>(
|
||||
/// The unlinking has _not_ been made durable.
|
||||
async fn download_object(
|
||||
storage: &GenericRemoteStorage,
|
||||
encryption_key_pair: Option<&EncryptionKeyPair>,
|
||||
src_path: &RemotePath,
|
||||
dst_path: &Utf8PathBuf,
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
|
||||
@@ -160,9 +171,12 @@ async fn download_object(
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
let mut opts = DownloadOpts::default();
|
||||
if let Some(encryption_key_pair) = encryption_key_pair {
|
||||
opts.encryption_key = Some(encryption_key_pair.plain_key.to_vec());
|
||||
}
|
||||
|
||||
let download = storage.download(src_path, &opts, cancel).await?;
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::RelSizeMigration;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::base64::Base64;
|
||||
use serde_with::serde_as;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -114,6 +116,70 @@ pub struct IndexPart {
|
||||
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
|
||||
|
||||
/// The encryption key used to encrypt the timeline layer files.
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub(crate) keys: Vec<EncryptionKey>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
|
||||
pub struct KeyVersion(pub u32);
|
||||
|
||||
impl KeyVersion {
|
||||
pub fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
}
|
||||
|
||||
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
|
||||
pub struct EncryptionKeyId {
|
||||
pub version: KeyVersion,
|
||||
pub generation: Generation,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EncryptionKeyPair {
|
||||
pub id: EncryptionKeyId,
|
||||
pub plain_key: Vec<u8>,
|
||||
pub wrapped_key: Vec<u8>,
|
||||
}
|
||||
|
||||
impl EncryptionKeyPair {
|
||||
pub fn new(id: EncryptionKeyId, plain_key: Vec<u8>, wrapped_key: Vec<u8>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
plain_key,
|
||||
wrapped_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for EncryptionKeyPair {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let display =
|
||||
base64::display::Base64Display::with_config(&self.wrapped_key, base64::STANDARD);
|
||||
struct DisplayAsDebug<T: std::fmt::Display>(T);
|
||||
impl<T: std::fmt::Display> std::fmt::Debug for DisplayAsDebug<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
f.debug_struct("EncryptionKeyPair")
|
||||
.field("id", &self.id)
|
||||
.field("plain_key", &"<REDACTED>")
|
||||
.field("wrapped_key", &DisplayAsDebug(&display))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct EncryptionKey {
|
||||
#[serde_as(as = "Base64")]
|
||||
pub key: Vec<u8>,
|
||||
pub id: EncryptionKeyId,
|
||||
pub created_at: NaiveDateTime,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
@@ -142,10 +208,12 @@ impl IndexPart {
|
||||
/// - 12: +l2_lsn
|
||||
/// - 13: +gc_compaction
|
||||
/// - 14: +marked_invisible_at
|
||||
const LATEST_VERSION: usize = 14;
|
||||
/// - 15: +keys and encryption_key in layer_metadata
|
||||
const LATEST_VERSION: usize = 15;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] =
|
||||
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -165,6 +233,7 @@ impl IndexPart {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,14 +274,16 @@ impl IndexPart {
|
||||
/// Check for invariants in the index: this is useful when uploading an index to ensure that if
|
||||
/// we encounter a bug, we do not persist buggy metadata.
|
||||
pub(crate) fn validate(&self) -> Result<(), String> {
|
||||
if self.import_pgdata.is_none()
|
||||
&& self.metadata.ancestor_timeline().is_none()
|
||||
&& self.layer_metadata.is_empty()
|
||||
{
|
||||
// Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
|
||||
// always have at least one layer.
|
||||
return Err("Index has no ancestor and no layers".to_string());
|
||||
}
|
||||
// We have to disable this check: we might need to upload an empty index part with new keys, or new `reldirv2` flag.
|
||||
|
||||
// if self.import_pgdata.is_none()
|
||||
// && self.metadata.ancestor_timeline().is_none()
|
||||
// && self.layer_metadata.is_empty()
|
||||
// {
|
||||
// // Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
|
||||
// // always have at least one layer.
|
||||
// return Err("Index has no ancestor and no layers".to_string());
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -222,7 +293,7 @@ impl IndexPart {
|
||||
///
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct LayerFileMetadata {
|
||||
pub file_size: u64,
|
||||
|
||||
@@ -233,6 +304,9 @@ pub struct LayerFileMetadata {
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub encryption_key: Option<EncryptionKeyId>,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
@@ -241,6 +315,7 @@ impl LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
encryption_key: None,
|
||||
}
|
||||
}
|
||||
/// Helper to get both generation and file size in a tuple
|
||||
@@ -453,14 +528,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -475,6 +552,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -502,14 +580,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -524,6 +604,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -552,14 +633,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -574,6 +657,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -627,6 +711,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
|
||||
@@ -653,14 +738,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -675,6 +762,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -703,11 +791,13 @@ mod tests {
|
||||
file_size: 23289856,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 1015808,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
|
||||
@@ -726,6 +816,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -756,14 +847,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -782,6 +875,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -815,12 +909,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -843,6 +939,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -877,12 +974,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -905,6 +1004,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -941,12 +1041,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -972,6 +1074,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1017,12 +1120,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1052,6 +1157,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1098,12 +1204,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1133,6 +1241,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1183,12 +1292,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1220,6 +1331,7 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1271,12 +1383,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1308,6 +1422,139 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v15_keys_are_parsed() {
|
||||
let example = r#"{
|
||||
"version": 15,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000, "encryption_key": { "version": 1, "generation": 5 } },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001, "encryption_key": { "version": 2, "generation": 6 } }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"gc_blocking": {
|
||||
"started_at": "2024-07-19T09:00:00.123",
|
||||
"reasons": ["DetachAncestor"]
|
||||
},
|
||||
"import_pgdata": {
|
||||
"V1": {
|
||||
"Done": {
|
||||
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
|
||||
"started_at": "2024-11-13T09:23:42.123",
|
||||
"finished_at": "2024-11-13T09:42:23.123"
|
||||
}
|
||||
}
|
||||
},
|
||||
"rel_size_migration": "legacy",
|
||||
"l2_lsn": "0/16960E8",
|
||||
"gc_compaction": {
|
||||
"last_completed_lsn": "0/16960E8"
|
||||
},
|
||||
"marked_invisible_at": "2023-07-31T09:00:00.123",
|
||||
"keys": [
|
||||
{
|
||||
"key": "dGVzdF9rZXk=",
|
||||
"id": {
|
||||
"version": 1,
|
||||
"generation": 5
|
||||
},
|
||||
"created_at": "2024-07-19T09:00:00.123"
|
||||
},
|
||||
{
|
||||
"key": "dGVzdF9rZXlfMg==",
|
||||
"id": {
|
||||
"version": 2,
|
||||
"generation": 6
|
||||
},
|
||||
"created_at": "2024-07-19T10:00:00.123"
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 15,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: Some(EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: Generation::Valid(5),
|
||||
}),
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: Some(EncryptionKeyId {
|
||||
version: KeyVersion(2),
|
||||
generation: Generation::Valid(6),
|
||||
}),
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: Some(GcBlocking {
|
||||
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
|
||||
}),
|
||||
last_aux_file_policy: Default::default(),
|
||||
archived_at: None,
|
||||
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
|
||||
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
|
||||
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
|
||||
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
|
||||
}))),
|
||||
rel_size_migration: Some(RelSizeMigration::Legacy),
|
||||
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
|
||||
gc_compaction: Some(GcCompactionState {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
keys: vec![
|
||||
EncryptionKey {
|
||||
key: "test_key".as_bytes().to_vec(),
|
||||
id: EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: Generation::Valid(5),
|
||||
},
|
||||
created_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
},
|
||||
EncryptionKey {
|
||||
key: "test_key_2".as_bytes().to_vec(),
|
||||
id: EncryptionKeyId {
|
||||
version: KeyVersion(2),
|
||||
generation: Generation::Valid(6),
|
||||
},
|
||||
created_at: parse_naive_datetime("2024-07-19T10:00:00.123000000"),
|
||||
}
|
||||
],
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
|
||||
@@ -17,7 +17,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
|
||||
use super::Generation;
|
||||
use super::index::IndexPart;
|
||||
use super::index::{EncryptionKeyPair, IndexPart};
|
||||
use super::manifest::TenantManifest;
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
|
||||
@@ -101,6 +101,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
local_path: &'a Utf8Path,
|
||||
remote_path: &'a RemotePath,
|
||||
metadata_size: u64,
|
||||
encryption_key_pair: Option<EncryptionKeyPair>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
fail_point!("before-upload-layer", |_| {
|
||||
@@ -144,7 +145,14 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
|
||||
|
||||
storage
|
||||
.upload(reader, fs_size, remote_path, None, cancel)
|
||||
.upload_with_encryption(
|
||||
reader,
|
||||
fs_size,
|
||||
remote_path,
|
||||
None,
|
||||
encryption_key_pair.as_ref().map(|k| k.plain_key.as_slice()),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("upload layer from local path '{local_path}'"))
|
||||
}
|
||||
|
||||
@@ -1310,6 +1310,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let downloaded_bytes = download_layer_file(
|
||||
self.conf,
|
||||
self.remote_storage,
|
||||
None, // TODO: add encryption key pair
|
||||
*tenant_shard_id,
|
||||
*timeline_id,
|
||||
&layer.name,
|
||||
|
||||
@@ -4864,6 +4864,7 @@ impl Timeline {
|
||||
else {
|
||||
panic!("delta layer cannot be empty if no filter is applied");
|
||||
};
|
||||
|
||||
(
|
||||
// FIXME: even though we have a single image and single delta layer assumption
|
||||
// we push them to vec
|
||||
@@ -6932,9 +6933,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Update remote_timeline_client state to reflect existence of this layer
|
||||
self.remote_client
|
||||
.schedule_layer_file_upload(image_layer)
|
||||
.unwrap();
|
||||
self.remote_client.schedule_layer_file_upload(image_layer)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -6995,9 +6994,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Update remote_timeline_client state to reflect existence of this layer
|
||||
self.remote_client
|
||||
.schedule_layer_file_upload(delta_layer)
|
||||
.unwrap();
|
||||
self.remote_client.schedule_layer_file_upload(delta_layer)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1291,6 +1291,7 @@ impl Timeline {
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
|
||||
@@ -244,7 +244,8 @@ impl RemoteStorageWrapper {
|
||||
kind: DownloadKind::Large,
|
||||
etag: None,
|
||||
byte_start: Bound::Included(start_inclusive),
|
||||
byte_end: Bound::Excluded(end_exclusive)
|
||||
byte_end: Bound::Excluded(end_exclusive),
|
||||
encryption_key: None,
|
||||
},
|
||||
&self.cancel)
|
||||
.await?;
|
||||
|
||||
@@ -9,6 +9,7 @@ use tracing::info;
|
||||
use utils::generation::Generation;
|
||||
use utils::lsn::{AtomicLsn, Lsn};
|
||||
|
||||
use super::remote_timeline_client::index::EncryptionKeyPair;
|
||||
use super::remote_timeline_client::is_same_remote_layer_path;
|
||||
use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
@@ -245,7 +246,7 @@ impl UploadQueueInitialized {
|
||||
pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
|
||||
self.inprogress_tasks
|
||||
.iter()
|
||||
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
|
||||
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _, _)))
|
||||
.count()
|
||||
}
|
||||
|
||||
@@ -461,7 +462,12 @@ pub struct Delete {
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum UploadOp {
|
||||
/// Upload a layer file. The last field indicates the last operation for thie file.
|
||||
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
|
||||
UploadLayer(
|
||||
ResidentLayer,
|
||||
LayerFileMetadata,
|
||||
Option<EncryptionKeyPair>,
|
||||
Option<OpType>,
|
||||
),
|
||||
|
||||
/// Upload a index_part.json file
|
||||
UploadMetadata {
|
||||
@@ -483,7 +489,7 @@ pub enum UploadOp {
|
||||
impl std::fmt::Display for UploadOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
UploadOp::UploadLayer(layer, metadata, mode) => {
|
||||
UploadOp::UploadLayer(layer, metadata, _, mode) => {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
|
||||
@@ -517,13 +523,13 @@ impl UploadOp {
|
||||
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
|
||||
|
||||
// Uploads and deletes can bypass each other unless they're for the same file.
|
||||
(UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
|
||||
(UploadOp::UploadLayer(a, ameta, _, _), UploadOp::UploadLayer(b, bmeta, _, _)) => {
|
||||
let aname = &a.layer_desc().layer_name();
|
||||
let bname = &b.layer_desc().layer_name();
|
||||
!is_same_remote_layer_path(aname, ameta, bname, bmeta)
|
||||
}
|
||||
(UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
|
||||
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
|
||||
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::Delete(d))
|
||||
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _, _)) => {
|
||||
d.layers.iter().all(|(dname, dmeta)| {
|
||||
!is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
|
||||
})
|
||||
@@ -539,8 +545,8 @@ impl UploadOp {
|
||||
// Similarly, index uploads can bypass uploads and deletes as long as neither the
|
||||
// uploaded index nor the active index references the file (the latter would be
|
||||
// incorrect use by the caller).
|
||||
(UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
|
||||
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
|
||||
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::UploadMetadata { uploaded: i })
|
||||
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _, _)) => {
|
||||
let uname = u.layer_desc().layer_name();
|
||||
!i.references(&uname, umeta) && !index.references(&uname, umeta)
|
||||
}
|
||||
@@ -577,7 +583,7 @@ mod tests {
|
||||
fn assert_same_op(a: &UploadOp, b: &UploadOp) {
|
||||
use UploadOp::*;
|
||||
match (a, b) {
|
||||
(UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
|
||||
(UploadLayer(a, ameta, _, atype), UploadLayer(b, bmeta, _, btype)) => {
|
||||
assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
|
||||
assert_eq!(ameta, bmeta);
|
||||
assert_eq!(atype, btype);
|
||||
@@ -641,6 +647,7 @@ mod tests {
|
||||
generation: timeline.generation,
|
||||
shard: timeline.get_shard_index(),
|
||||
file_size: size as u64,
|
||||
encryption_key: None,
|
||||
};
|
||||
make_layer_with_metadata(timeline, name, metadata)
|
||||
}
|
||||
@@ -710,7 +717,7 @@ mod tests {
|
||||
|
||||
// Enqueue non-conflicting upload, delete, and index before and after a barrier.
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
|
||||
}),
|
||||
@@ -718,7 +725,7 @@ mod tests {
|
||||
uploaded: index.clone(),
|
||||
},
|
||||
UploadOp::Barrier(barrier),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -843,9 +850,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -882,14 +889,14 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![
|
||||
(layer0.layer_desc().layer_name(), layer0.metadata()),
|
||||
(layer1.layer_desc().layer_name(), layer1.metadata()),
|
||||
],
|
||||
}),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -938,15 +945,15 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![
|
||||
(layer0.layer_desc().layer_name(), layer0.metadata()),
|
||||
(layer1.layer_desc().layer_name(), layer1.metadata()),
|
||||
],
|
||||
}),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -984,9 +991,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -1061,15 +1068,15 @@ mod tests {
|
||||
let index2 = index_with(&index1, &layer2);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index0.clone(),
|
||||
},
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index1.clone(),
|
||||
},
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index2.clone(),
|
||||
},
|
||||
@@ -1128,7 +1135,7 @@ mod tests {
|
||||
|
||||
let ops = [
|
||||
// Initial upload, with a barrier to prevent index coalescing.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_upload.clone(),
|
||||
},
|
||||
@@ -1177,7 +1184,7 @@ mod tests {
|
||||
|
||||
let ops = [
|
||||
// Initial upload, with a barrier to prevent index coalescing.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_upload.clone(),
|
||||
},
|
||||
@@ -1187,7 +1194,7 @@ mod tests {
|
||||
uploaded: index_deref.clone(),
|
||||
},
|
||||
// Replace and reference the layer.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_ref.clone(),
|
||||
},
|
||||
@@ -1235,7 +1242,7 @@ mod tests {
|
||||
|
||||
// Enqueue non-conflicting upload, delete, and index before and after a shutdown.
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
|
||||
}),
|
||||
@@ -1243,7 +1250,7 @@ mod tests {
|
||||
uploaded: index.clone(),
|
||||
},
|
||||
UploadOp::Shutdown,
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -1305,10 +1312,10 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -1359,7 +1366,7 @@ mod tests {
|
||||
.layer_metadata
|
||||
.insert(layer.layer_desc().layer_name(), layer.metadata());
|
||||
vec![
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
|
||||
}),
|
||||
@@ -1378,6 +1385,7 @@ mod tests {
|
||||
shard,
|
||||
generation: Generation::Valid(generation),
|
||||
file_size: 0,
|
||||
encryption_key: None,
|
||||
};
|
||||
make_layer_with_metadata(&tli, name, metadata)
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user