mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
feat(pageserver): upload encrypted layer files
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4251,6 +4251,7 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"base64 0.13.1",
|
||||
"bincode",
|
||||
"bit_field",
|
||||
"byteorder",
|
||||
@@ -4298,6 +4299,7 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"range-set-blaze",
|
||||
"regex",
|
||||
"remote_keys",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rpds",
|
||||
|
||||
@@ -256,6 +256,7 @@ postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
postgres_initdb = { path = "./libs/postgres_initdb" }
|
||||
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
|
||||
remote_keys = { version = "0.1", path = "./libs/remote_keys/" }
|
||||
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
|
||||
@@ -17,6 +17,7 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
base64.workspace = true
|
||||
bit_field.workspace = true
|
||||
bincode.workspace = true
|
||||
byteorder.workspace = true
|
||||
@@ -82,6 +83,7 @@ postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
remote_keys.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
http-utils.workspace = true
|
||||
|
||||
@@ -192,11 +192,12 @@ pub(crate) use download::{
|
||||
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
|
||||
list_remote_tenant_shards, list_remote_timelines,
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
use index::{EncryptionKey, EncryptionKeyId, EncryptionKeyPair, GcCompactionState, KeyVersion};
|
||||
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use regex::Regex;
|
||||
use remote_keys::NaiveKms;
|
||||
use remote_storage::{
|
||||
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
|
||||
};
|
||||
@@ -367,6 +368,8 @@ pub(crate) struct RemoteTimelineClient {
|
||||
config: std::sync::RwLock<RemoteTimelineClientConfig>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
|
||||
kms_impl: Option<NaiveKms>,
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClient {
|
||||
@@ -411,6 +414,8 @@ impl RemoteTimelineClient {
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
// TODO: make this configurable
|
||||
kms_impl: Some(NaiveKms::new(tenant_shard_id.tenant_id.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1237,10 +1242,12 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
layer: ResidentLayer,
|
||||
) -> Result<(), NotInitialized> {
|
||||
let key_pair = self.schedule_generate_encryption_key()?;
|
||||
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
self.schedule_layer_file_upload0(upload_queue, layer);
|
||||
self.schedule_layer_file_upload0(upload_queue, layer, key_pair);
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
Ok(())
|
||||
}
|
||||
@@ -1249,8 +1256,16 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
layer: ResidentLayer,
|
||||
key_pair: Option<EncryptionKeyPair>,
|
||||
) {
|
||||
let metadata = layer.metadata();
|
||||
let mut metadata = layer.metadata();
|
||||
assert!(
|
||||
metadata.encryption_key.is_none(),
|
||||
"layer key is set automatically in schedule_layer_file_upload, should not be set manually"
|
||||
);
|
||||
if let Some(ref key_pair) = key_pair {
|
||||
metadata.encryption_key = Some(key_pair.id.clone());
|
||||
}
|
||||
|
||||
upload_queue
|
||||
.dirty
|
||||
@@ -1264,7 +1279,7 @@ impl RemoteTimelineClient {
|
||||
"scheduled layer file upload {layer}",
|
||||
);
|
||||
|
||||
let op = UploadOp::UploadLayer(layer, metadata, None);
|
||||
let op = UploadOp::UploadLayer(layer, metadata, key_pair, None);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
@@ -1446,6 +1461,55 @@ impl RemoteTimelineClient {
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
|
||||
fn is_kms_enabled(&self) -> bool {
|
||||
self.kms_impl.is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_generate_encryption_key(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<Option<EncryptionKeyPair>, NotInitialized> {
|
||||
let Some(kms_impl) = self.kms_impl.as_ref() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let plain_key = rand::random::<[u8; 32]>().to_vec(); // StdRng is cryptographically secure (?)
|
||||
let wrapped_key = kms_impl.encrypt(&plain_key).unwrap();
|
||||
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let last_key = upload_queue.dirty.keys.last();
|
||||
let this_key_version = if let Some(last_key) = last_key {
|
||||
let key_version = EncryptionKeyId {
|
||||
version: last_key.id.version.next(),
|
||||
generation: self.generation,
|
||||
};
|
||||
assert!(key_version > last_key.id); // ensure key version is strictly increasing; no dup key versions
|
||||
key_version
|
||||
} else {
|
||||
EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: self.generation,
|
||||
}
|
||||
};
|
||||
|
||||
let key_pair = EncryptionKeyPair {
|
||||
id: this_key_version.clone(),
|
||||
plain_key: plain_key.clone(),
|
||||
wrapped_key,
|
||||
};
|
||||
|
||||
upload_queue.dirty.keys.push(EncryptionKey {
|
||||
key: plain_key,
|
||||
id: this_key_version,
|
||||
created_at: Utc::now().naive_utc(),
|
||||
});
|
||||
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Ok(Some(key_pair))
|
||||
}
|
||||
|
||||
/// Schedules a compaction update to the remote `index_part.json`.
|
||||
///
|
||||
/// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
|
||||
@@ -1454,11 +1518,14 @@ impl RemoteTimelineClient {
|
||||
compacted_from: &[Layer],
|
||||
compacted_to: &[ResidentLayer],
|
||||
) -> Result<(), NotInitialized> {
|
||||
// Use the same key for all layers in a single compaction job
|
||||
let key_pair = self.schedule_generate_encryption_key()?;
|
||||
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
for layer in compacted_to {
|
||||
self.schedule_layer_file_upload0(upload_queue, layer.clone());
|
||||
self.schedule_layer_file_upload0(upload_queue, layer.clone(), key_pair.clone());
|
||||
}
|
||||
|
||||
let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
|
||||
@@ -1715,6 +1782,7 @@ impl RemoteTimelineClient {
|
||||
uploaded.local_path(),
|
||||
&remote_path,
|
||||
uploaded.metadata().file_size,
|
||||
None, // TODO(chi): support encryption for those layer files uploaded using this interface
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
@@ -1757,6 +1825,8 @@ impl RemoteTimelineClient {
|
||||
adopted_as.metadata().generation,
|
||||
);
|
||||
|
||||
// TODO: support encryption for those layer files uploaded using this interface
|
||||
|
||||
backoff::retry(
|
||||
|| async {
|
||||
upload::copy_timeline_layer(
|
||||
@@ -1977,7 +2047,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Prepare upload.
|
||||
match &mut next_op {
|
||||
UploadOp::UploadLayer(layer, meta, mode) => {
|
||||
UploadOp::UploadLayer(layer, meta, _, mode) => {
|
||||
if upload_queue
|
||||
.recently_deleted
|
||||
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
|
||||
@@ -2071,7 +2141,7 @@ impl RemoteTimelineClient {
|
||||
// Assert that we don't modify a layer that's referenced by the current index.
|
||||
if cfg!(debug_assertions) {
|
||||
let modified = match &task.op {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, _) => {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, _, _) => {
|
||||
vec![(layer.layer_desc().layer_name(), layer_metadata)]
|
||||
}
|
||||
UploadOp::Delete(delete) => {
|
||||
@@ -2093,7 +2163,7 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, mode) => {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, encryption_key_pair, mode) => {
|
||||
// TODO: check if this mechanism can be removed now that can_bypass() performs
|
||||
// conflict checks during scheduling.
|
||||
if let Some(OpType::FlushDeletion) = mode {
|
||||
@@ -2174,6 +2244,7 @@ impl RemoteTimelineClient {
|
||||
local_path,
|
||||
&remote_path,
|
||||
layer_metadata.file_size,
|
||||
encryption_key_pair.clone(),
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
@@ -2324,7 +2395,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.inprogress_tasks.remove(&task.task_id);
|
||||
|
||||
let lsn_update = match task.op {
|
||||
UploadOp::UploadLayer(_, _, _) => None,
|
||||
UploadOp::UploadLayer(_, _, _, _) => None,
|
||||
UploadOp::UploadMetadata { ref uploaded } => {
|
||||
// the task id is reused as a monotonicity check for storing the "clean"
|
||||
// IndexPart.
|
||||
@@ -2403,7 +2474,7 @@ impl RemoteTimelineClient {
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, m, _) => (
|
||||
UploadOp::UploadLayer(_, m, _, _) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
|
||||
@@ -2840,6 +2911,7 @@ mod tests {
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
kms_impl: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -122,14 +122,55 @@ pub struct IndexPart {
|
||||
pub(crate) keys: Vec<EncryptionKey>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct KeyVersion(u32);
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd)]
|
||||
pub struct KeyVersion(pub u32);
|
||||
|
||||
impl KeyVersion {
|
||||
pub fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
}
|
||||
|
||||
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd)]
|
||||
pub struct EncryptionKeyId {
|
||||
version: KeyVersion,
|
||||
generation: Generation,
|
||||
pub version: KeyVersion,
|
||||
pub generation: Generation,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EncryptionKeyPair {
|
||||
pub id: EncryptionKeyId,
|
||||
pub plain_key: Vec<u8>,
|
||||
pub wrapped_key: Vec<u8>,
|
||||
}
|
||||
|
||||
impl EncryptionKeyPair {
|
||||
pub fn new(id: EncryptionKeyId, plain_key: Vec<u8>, wrapped_key: Vec<u8>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
plain_key,
|
||||
wrapped_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for EncryptionKeyPair {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let display =
|
||||
base64::display::Base64Display::with_config(&self.wrapped_key, base64::STANDARD);
|
||||
struct DisplayAsDebug<T: std::fmt::Display>(T);
|
||||
impl<T: std::fmt::Display> std::fmt::Debug for DisplayAsDebug<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
f.debug_struct("EncryptionKeyPair")
|
||||
.field("id", &self.id)
|
||||
.field("plain_key", &"<REDACTED>")
|
||||
.field("wrapped_key", &DisplayAsDebug(&display))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -17,7 +17,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
|
||||
use super::Generation;
|
||||
use super::index::IndexPart;
|
||||
use super::index::{EncryptionKeyPair, IndexPart};
|
||||
use super::manifest::TenantManifest;
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
|
||||
@@ -101,6 +101,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
local_path: &'a Utf8Path,
|
||||
remote_path: &'a RemotePath,
|
||||
metadata_size: u64,
|
||||
encryption_key_pair: Option<EncryptionKeyPair>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
fail_point!("before-upload-layer", |_| {
|
||||
@@ -144,7 +145,14 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
|
||||
|
||||
storage
|
||||
.upload(reader, fs_size, remote_path, None, cancel)
|
||||
.upload_with_encryption(
|
||||
reader,
|
||||
fs_size,
|
||||
remote_path,
|
||||
None,
|
||||
encryption_key_pair.as_ref().map(|k| k.plain_key.as_slice()),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("upload layer from local path '{local_path}'"))
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use tracing::info;
|
||||
use utils::generation::Generation;
|
||||
use utils::lsn::{AtomicLsn, Lsn};
|
||||
|
||||
use super::remote_timeline_client::index::EncryptionKeyPair;
|
||||
use super::remote_timeline_client::is_same_remote_layer_path;
|
||||
use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
@@ -245,7 +246,7 @@ impl UploadQueueInitialized {
|
||||
pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
|
||||
self.inprogress_tasks
|
||||
.iter()
|
||||
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
|
||||
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _, _)))
|
||||
.count()
|
||||
}
|
||||
|
||||
@@ -461,7 +462,12 @@ pub struct Delete {
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum UploadOp {
|
||||
/// Upload a layer file. The last field indicates the last operation for thie file.
|
||||
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
|
||||
UploadLayer(
|
||||
ResidentLayer,
|
||||
LayerFileMetadata,
|
||||
Option<EncryptionKeyPair>,
|
||||
Option<OpType>,
|
||||
),
|
||||
|
||||
/// Upload a index_part.json file
|
||||
UploadMetadata {
|
||||
@@ -483,7 +489,7 @@ pub enum UploadOp {
|
||||
impl std::fmt::Display for UploadOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
UploadOp::UploadLayer(layer, metadata, mode) => {
|
||||
UploadOp::UploadLayer(layer, metadata, _, mode) => {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
|
||||
@@ -517,13 +523,13 @@ impl UploadOp {
|
||||
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
|
||||
|
||||
// Uploads and deletes can bypass each other unless they're for the same file.
|
||||
(UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
|
||||
(UploadOp::UploadLayer(a, ameta, _, _), UploadOp::UploadLayer(b, bmeta, _, _)) => {
|
||||
let aname = &a.layer_desc().layer_name();
|
||||
let bname = &b.layer_desc().layer_name();
|
||||
!is_same_remote_layer_path(aname, ameta, bname, bmeta)
|
||||
}
|
||||
(UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
|
||||
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
|
||||
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::Delete(d))
|
||||
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _, _)) => {
|
||||
d.layers.iter().all(|(dname, dmeta)| {
|
||||
!is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
|
||||
})
|
||||
@@ -539,8 +545,8 @@ impl UploadOp {
|
||||
// Similarly, index uploads can bypass uploads and deletes as long as neither the
|
||||
// uploaded index nor the active index references the file (the latter would be
|
||||
// incorrect use by the caller).
|
||||
(UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
|
||||
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
|
||||
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::UploadMetadata { uploaded: i })
|
||||
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _, _)) => {
|
||||
let uname = u.layer_desc().layer_name();
|
||||
!i.references(&uname, umeta) && !index.references(&uname, umeta)
|
||||
}
|
||||
@@ -577,7 +583,7 @@ mod tests {
|
||||
fn assert_same_op(a: &UploadOp, b: &UploadOp) {
|
||||
use UploadOp::*;
|
||||
match (a, b) {
|
||||
(UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
|
||||
(UploadLayer(a, ameta, _, atype), UploadLayer(b, bmeta, _, btype)) => {
|
||||
assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
|
||||
assert_eq!(ameta, bmeta);
|
||||
assert_eq!(atype, btype);
|
||||
@@ -711,7 +717,7 @@ mod tests {
|
||||
|
||||
// Enqueue non-conflicting upload, delete, and index before and after a barrier.
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
|
||||
}),
|
||||
@@ -719,7 +725,7 @@ mod tests {
|
||||
uploaded: index.clone(),
|
||||
},
|
||||
UploadOp::Barrier(barrier),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -844,9 +850,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -883,14 +889,14 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![
|
||||
(layer0.layer_desc().layer_name(), layer0.metadata()),
|
||||
(layer1.layer_desc().layer_name(), layer1.metadata()),
|
||||
],
|
||||
}),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -939,15 +945,15 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![
|
||||
(layer0.layer_desc().layer_name(), layer0.metadata()),
|
||||
(layer1.layer_desc().layer_name(), layer1.metadata()),
|
||||
],
|
||||
}),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -985,9 +991,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -1062,15 +1068,15 @@ mod tests {
|
||||
let index2 = index_with(&index1, &layer2);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index0.clone(),
|
||||
},
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index1.clone(),
|
||||
},
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index2.clone(),
|
||||
},
|
||||
@@ -1129,7 +1135,7 @@ mod tests {
|
||||
|
||||
let ops = [
|
||||
// Initial upload, with a barrier to prevent index coalescing.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_upload.clone(),
|
||||
},
|
||||
@@ -1178,7 +1184,7 @@ mod tests {
|
||||
|
||||
let ops = [
|
||||
// Initial upload, with a barrier to prevent index coalescing.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_upload.clone(),
|
||||
},
|
||||
@@ -1188,7 +1194,7 @@ mod tests {
|
||||
uploaded: index_deref.clone(),
|
||||
},
|
||||
// Replace and reference the layer.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_ref.clone(),
|
||||
},
|
||||
@@ -1236,7 +1242,7 @@ mod tests {
|
||||
|
||||
// Enqueue non-conflicting upload, delete, and index before and after a shutdown.
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
|
||||
}),
|
||||
@@ -1244,7 +1250,7 @@ mod tests {
|
||||
uploaded: index.clone(),
|
||||
},
|
||||
UploadOp::Shutdown,
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -1306,10 +1312,10 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -1360,7 +1366,7 @@ mod tests {
|
||||
.layer_metadata
|
||||
.insert(layer.layer_desc().layer_name(), layer.metadata());
|
||||
vec![
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
|
||||
}),
|
||||
|
||||
Reference in New Issue
Block a user