From 9f119d6f648bc338e5839a67fddce06fdfd39d8c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 2 Jan 2025 17:37:22 +0100 Subject: [PATCH] pageserver: simplify `block_deletions` handling --- .../src/tenant/remote_timeline_client.rs | 174 ++++-------------- pageserver/src/tenant/timeline.rs | 25 --- pageserver/src/tenant/upload_queue.rs | 132 ++++++------- 3 files changed, 101 insertions(+), 230 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 713efbb9a4..aea04ec102 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -195,7 +195,7 @@ use utils::backoff::{ use utils::pausable_failpoint; use utils::shard::ShardNumber; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::Duration; @@ -219,7 +219,7 @@ use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::download::download_retry; use crate::tenant::storage_layer::AsLayerDesc; -use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable}; +use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable}; use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::{ config::PageServerConf, @@ -499,37 +499,21 @@ impl RemoteTimelineClient { /// Notify this client of a change to its parent tenant's config, as this may cause us to /// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle) - pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) { + pub(super) fn update_config(self: &Arc, location_conf: &AttachedLocationConfig) { let new_conf = RemoteTimelineClientConfig::from(location_conf); - let unblocked = !new_conf.block_deletions; - // Update config before draining deletions, so that we don't race with more being - // inserted. This can result in deletions happening our of order, but that does not - // violate any invariants: deletions only need to be ordered relative to upload of the index - // that dereferences the deleted objects, and we are not changing that order. + let mut block_deletions_below = Generation::None; + if new_conf.block_deletions { + block_deletions_below = self.generation; + } + *self.config.write().unwrap() = new_conf; - if unblocked { - // If we may now delete layers, drain any that were blocked in our old - // configuration state - let mut queue_locked = self.upload_queue.lock().unwrap(); - - if let Ok(queue) = queue_locked.initialized_mut() { - let blocked_deletions = std::mem::take(&mut queue.blocked_deletions); - for d in blocked_deletions { - if let Err(e) = self.deletion_queue_client.push_layers( - self.tenant_shard_id, - self.timeline_id, - self.generation, - d.layers, - ) { - // This could happen if the pageserver is shut down while a tenant - // is transitioning from a deletion-blocked state: we will leak some - // S3 objects in this case. - warn!("Failed to drain blocked deletions: {}", e); - break; - } - } + // If the conf change may unblock any deletions, try to launch them. + if let Ok(queue) = self.upload_queue.lock().unwrap().initialized_mut() { + if block_deletions_below != queue.block_deletions_below { + queue.block_deletions_below = block_deletions_below; + self.launch_queued_tasks(queue); } } } @@ -1185,7 +1169,7 @@ impl RemoteTimelineClient { "scheduled layer file upload {layer}", ); - let op = UploadOp::UploadLayer(layer, metadata, None); + let op = UploadOp::UploadLayer(layer, metadata); self.metric_begin(&op); upload_queue.queued_operations.push_back(op); } @@ -1411,13 +1395,6 @@ impl RemoteTimelineClient { Ok(()) } - pub(crate) fn schedule_barrier(self: &Arc) -> anyhow::Result<()> { - let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard.initialized_mut()?; - self.schedule_barrier0(upload_queue); - Ok(()) - } - fn schedule_barrier0( self: &Arc, upload_queue: &mut UploadQueueInitialized, @@ -1892,31 +1869,16 @@ impl RemoteTimelineClient { while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() { debug!("starting op: {next_op}"); - // Prepare upload. + // Handle barriers and shutdowns. match &mut next_op { - UploadOp::UploadLayer(layer, meta, mode) => { - if upload_queue - .recently_deleted - .remove(&(layer.layer_desc().layer_name().clone(), meta.generation)) - { - *mode = Some(OpType::FlushDeletion); - } else { - *mode = Some(OpType::MayReorder) - } - } - UploadOp::UploadMetadata { .. } => {} - UploadOp::Delete(Delete { layers }) => { - for (name, meta) in layers { - upload_queue - .recently_deleted - .insert((name.clone(), meta.generation)); - } - } UploadOp::Barrier(sender) => { sender.send_replace(()); continue; } UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"), + UploadOp::UploadLayer(..) => {} + UploadOp::UploadMetadata { .. } => {} + UploadOp::Delete(Delete { .. }) => {} }; // Assign unique ID to this task @@ -1988,7 +1950,7 @@ impl RemoteTimelineClient { // Assert that we don't modify a layer that's referenced by the current index. if cfg!(debug_assertions) { let modified = match &task.op { - UploadOp::UploadLayer(layer, layer_metadata, _) => { + UploadOp::UploadLayer(layer, layer_metadata) => { vec![(layer.layer_desc().layer_name(), layer_metadata)] } UploadOp::Delete(delete) => { @@ -2010,68 +1972,7 @@ impl RemoteTimelineClient { } let upload_result: anyhow::Result<()> = match &task.op { - UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => { - // TODO: check if this mechanism can be removed now that can_bypass() performs - // conflict checks during scheduling. - if let Some(OpType::FlushDeletion) = mode { - if self.config.read().unwrap().block_deletions { - // Of course, this is not efficient... but usually the queue should be empty. - let mut queue_locked = self.upload_queue.lock().unwrap(); - let mut detected = false; - if let Ok(queue) = queue_locked.initialized_mut() { - for list in queue.blocked_deletions.iter_mut() { - list.layers.retain(|(name, meta)| { - if name == &layer.layer_desc().layer_name() - && meta.generation == layer_metadata.generation - { - detected = true; - // remove the layer from deletion queue - false - } else { - // keep the layer - true - } - }); - } - } - if detected { - info!( - "cancelled blocked deletion of layer {} at gen {:?}", - layer.layer_desc().layer_name(), - layer_metadata.generation - ); - } - } else { - // TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions - // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted, - // which is not possible in the current system. - info!( - "waiting for deletion queue flush to complete before uploading layer {} at gen {:?}", - layer.layer_desc().layer_name(), - layer_metadata.generation - ); - { - // We are going to flush, we can clean up the recently deleted list. - let mut queue_locked = self.upload_queue.lock().unwrap(); - if let Ok(queue) = queue_locked.initialized_mut() { - queue.recently_deleted.clear(); - } - } - if let Err(e) = self.deletion_queue_client.flush_execute().await { - warn!( - "failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ", - layer.layer_desc().layer_name(), - layer_metadata.generation - ); - } else { - info!( - "done flushing deletion queue before uploading layer {} at gen {:?}", - layer.layer_desc().layer_name(), - layer_metadata.generation - ); - } - } - } + UploadOp::UploadLayer(ref layer, ref layer_metadata) => { let local_path = layer.local_path(); // We should only be uploading layers created by this `Tenant`'s lifetime, so @@ -2136,23 +2037,17 @@ impl RemoteTimelineClient { res } UploadOp::Delete(delete) => { - if self.config.read().unwrap().block_deletions { - let mut queue_locked = self.upload_queue.lock().unwrap(); - if let Ok(queue) = queue_locked.initialized_mut() { - queue.blocked_deletions.push(delete.clone()); - } - Ok(()) - } else { - pausable_failpoint!("before-delete-layer-pausable"); - self.deletion_queue_client - .push_layers( - self.tenant_shard_id, - self.timeline_id, - self.generation, - delete.layers.clone(), - ) - .map_err(|e| anyhow::anyhow!(e)) - } + pausable_failpoint!("before-delete-layer-pausable"); + // TODO: these can't go through the deletion queue, since it's asynchronous and + // may clobber a later write to this later. + self.deletion_queue_client + .push_layers( + self.tenant_shard_id, + self.timeline_id, + self.generation, + delete.layers.clone(), + ) + .map_err(|e| anyhow::anyhow!(e)) } unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => { // unreachable. Barrier operations are handled synchronously in @@ -2234,7 +2129,7 @@ impl RemoteTimelineClient { upload_queue.inprogress_tasks.remove(&task.task_id); let lsn_update = match task.op { - UploadOp::UploadLayer(_, _, _) => None, + UploadOp::UploadLayer(_, _) => None, UploadOp::UploadMetadata { ref uploaded } => { // the task id is reused as a monotonicity check for storing the "clean" // IndexPart. @@ -2309,7 +2204,7 @@ impl RemoteTimelineClient { )> { use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize; let res = match op { - UploadOp::UploadLayer(_, m, _) => ( + UploadOp::UploadLayer(_, m) => ( RemoteOpFileKind::Layer, RemoteOpKind::Upload, RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size), @@ -2399,12 +2294,11 @@ impl RemoteTimelineClient { .clone(), inprogress_tasks: HashMap::default(), queued_operations: VecDeque::default(), + block_deletions_below: Generation::None, #[cfg(feature = "testing")] dangling_files: HashMap::default(), - blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), - recently_deleted: HashSet::new(), }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index afa8efa453..4eee84dc53 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2951,31 +2951,6 @@ impl Timeline { .schedule_layer_file_deletion(&needs_cleanup)?; self.remote_client .schedule_index_upload_for_file_changes()?; - // This barrier orders above DELETEs before any later operations. - // This is critical because code executing after the barrier might - // create again objects with the same key that we just scheduled for deletion. - // For example, if we just scheduled deletion of an image layer "from the future", - // later compaction might run again and re-create the same image layer. - // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn. - // "same" here means same key range and LSN. - // - // Without a barrier between above DELETEs and the re-creation's PUTs, - // the upload queue may execute the PUT first, then the DELETE. - // In our example, we will end up with an IndexPart referencing a non-existent object. - // - // 1. a future image layer is created and uploaded - // 2. ps restart - // 3. the future layer from (1) is deleted during load layer map - // 4. image layer is re-created and uploaded - // 5. deletion queue would like to delete (1) but actually deletes (4) - // 6. delete by name works as expected, but it now deletes the wrong (later) version - // - // See https://github.com/neondatabase/neon/issues/5878 - // - // NB: generation numbers naturally protect against this because they disambiguate - // (1) and (4) - // TODO: this is basically a no-op now, should we remove it? - self.remote_client.schedule_barrier()?; // Tenant::create_timeline will wait for these uploads to happen before returning, or // on retry. diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index d302205ffe..0334a31838 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -1,23 +1,21 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::sync::atomic::AtomicU32; use std::sync::Arc; -use super::remote_timeline_client::is_same_remote_layer_path; -use super::storage_layer::AsLayerDesc as _; -use super::storage_layer::LayerName; -use super::storage_layer::ResidentLayer; -use crate::tenant::metadata::TimelineMetadata; -use crate::tenant::remote_timeline_client::index::IndexPart; -use crate::tenant::remote_timeline_client::index::LayerFileMetadata; -use utils::generation::Generation; -use utils::lsn::{AtomicLsn, Lsn}; - use chrono::NaiveDateTime; use once_cell::sync::Lazy; use tracing::info; +use super::remote_timeline_client::is_same_remote_layer_path; +use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer}; +use crate::tenant::metadata::TimelineMetadata; +use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata}; +use utils::generation::Generation; +use utils::lsn::{AtomicLsn, Lsn}; + /// Kill switch for upload queue reordering in case it causes problems. +/// NB: enabling this will block all uploads in AttachedMulti tenants with a blocked deletion. /// TODO: remove this once we have confidence in it. static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy = Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true")); @@ -48,12 +46,6 @@ impl UploadQueue { } } -#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] -pub enum OpType { - MayReorder, - FlushDeletion, -} - /// This keeps track of queued and in-progress tasks. pub struct UploadQueueInitialized { /// Maximum number of inprogress tasks to schedule. 0 is no limit. @@ -94,6 +86,9 @@ pub struct UploadQueueInitialized { /// preceding layer file uploads have completed. pub queued_operations: VecDeque, + /// If set to a valid generation, deletions at generations below this are blocked. + pub(crate) block_deletions_below: Generation, + /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around /// for error logging. /// @@ -102,12 +97,6 @@ pub struct UploadQueueInitialized { #[cfg(feature = "testing")] pub(crate) dangling_files: HashMap, - /// Ensure we order file operations correctly. - pub(crate) recently_deleted: HashSet<(LayerName, Generation)>, - - /// Deletions that are blocked by the tenant configuration - pub(crate) blocked_deletions: Vec, - /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`. pub(crate) shutting_down: bool, @@ -210,6 +199,21 @@ impl UploadQueueInitialized { /// The position must be valid for the queue size. fn is_ready(&self, pos: usize) -> bool { let candidate = self.queued_operations.get(pos).expect("invalid position"); + + // Don't allow deletions for past generations if blocked. + // TODO: document motivation. + if self.block_deletions_below != Generation::None { + if let UploadOp::Delete(delete) = candidate { + if delete + .layers + .iter() + .any(|(_, metadata)| metadata.generation < self.block_deletions_below) + { + return false; + } + } + } + self // Look at in-progress operations, in random order. .inprogress_tasks @@ -248,7 +252,7 @@ impl UploadQueueInitialized { pub(crate) fn num_inprogress_layer_uploads(&self) -> usize { self.inprogress_tasks .iter() - .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _))) + .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _))) .count() } @@ -358,10 +362,9 @@ impl UploadQueue { task_counter: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), + block_deletions_below: Generation::None, #[cfg(feature = "testing")] dangling_files: HashMap::new(), - recently_deleted: HashSet::new(), - blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -399,10 +402,9 @@ impl UploadQueue { task_counter: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), + block_deletions_below: Generation::None, #[cfg(feature = "testing")] dangling_files: HashMap::new(), - recently_deleted: HashSet::new(), - blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -463,7 +465,7 @@ pub struct Delete { #[derive(Clone, Debug)] pub enum UploadOp { /// Upload a layer file. The last field indicates the last operation for thie file. - UploadLayer(ResidentLayer, LayerFileMetadata, Option), + UploadLayer(ResidentLayer, LayerFileMetadata), /// Upload a index_part.json file UploadMetadata { @@ -485,11 +487,11 @@ pub enum UploadOp { impl std::fmt::Display for UploadOp { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - UploadOp::UploadLayer(layer, metadata, mode) => { + UploadOp::UploadLayer(layer, metadata) => { write!( f, - "UploadLayer({}, size={:?}, gen={:?}, mode={:?})", - layer, metadata.file_size, metadata.generation, mode + "UploadLayer({}, size={:?}, gen={:?})", + layer, metadata.file_size, metadata.generation ) } UploadOp::UploadMetadata { uploaded, .. } => { @@ -519,13 +521,13 @@ impl UploadOp { (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false, // Uploads and deletes can bypass each other unless they're for the same file. - (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => { + (UploadOp::UploadLayer(a, ameta), UploadOp::UploadLayer(b, bmeta)) => { let aname = &a.layer_desc().layer_name(); let bname = &b.layer_desc().layer_name(); !is_same_remote_layer_path(aname, ameta, bname, bmeta) } - (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d)) - | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => { + (UploadOp::UploadLayer(u, umeta), UploadOp::Delete(d)) + | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta)) => { d.layers.iter().all(|(dname, dmeta)| { !is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta) }) @@ -541,8 +543,8 @@ impl UploadOp { // Similarly, index uploads can bypass uploads and deletes as long as neither the // uploaded index nor the active index references the file (the latter would be // incorrect use by the caller). - (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i }) - | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => { + (UploadOp::UploadLayer(u, umeta), UploadOp::UploadMetadata { uploaded: i }) + | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta)) => { let uname = u.layer_desc().layer_name(); !i.references(&uname, umeta) && !index.references(&uname, umeta) } @@ -570,6 +572,7 @@ mod tests { use crate::DEFAULT_PG_VERSION; use itertools::Itertools as _; use std::str::FromStr as _; + use utils::generation::Generation; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; /// Test helper which asserts that two operations are the same, in lieu of UploadOp PartialEq. @@ -577,10 +580,9 @@ mod tests { fn assert_same_op(a: &UploadOp, b: &UploadOp) { use UploadOp::*; match (a, b) { - (UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => { + (UploadLayer(a, ameta), UploadLayer(b, bmeta)) => { assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name()); assert_eq!(ameta, bmeta); - assert_eq!(atype, btype); } (Delete(a), Delete(b)) => assert_eq!(a.layers, b.layers), (UploadMetadata { uploaded: a }, UploadMetadata { uploaded: b }) => assert_eq!(a, b), @@ -698,7 +700,7 @@ mod tests { // Enqueue non-conflicting upload, delete, and index before and after a barrier. let ops = [ - UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer0.clone(), layer0.metadata()), UploadOp::Delete(Delete { layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())], }), @@ -706,7 +708,7 @@ mod tests { uploaded: index.clone(), }, UploadOp::Barrier(barrier), - UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata()), UploadOp::Delete(Delete { layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())], }), @@ -807,9 +809,9 @@ mod tests { let layer0c = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 3); let ops = [ - UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None), - UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None), - UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None), + UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata()), + UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata()), + UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata()), ]; queue.queued_operations.extend(ops.clone()); @@ -840,14 +842,14 @@ mod tests { let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); let ops = [ - UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer0.clone(), layer0.metadata()), UploadOp::Delete(Delete { layers: vec![ (layer0.layer_desc().layer_name(), layer0.metadata()), (layer1.layer_desc().layer_name(), layer1.metadata()), ], }), - UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata()), ]; queue.queued_operations.extend(ops.clone()); @@ -884,15 +886,15 @@ mod tests { let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); let ops = [ - UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer0.clone(), layer0.metadata()), UploadOp::Delete(Delete { layers: vec![ (layer0.layer_desc().layer_name(), layer0.metadata()), (layer1.layer_desc().layer_name(), layer1.metadata()), ], }), - UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), - UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata()), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata()), UploadOp::Delete(Delete { layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())], }), @@ -921,9 +923,9 @@ mod tests { let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); let ops = [ - UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), - UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), - UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadLayer(layer0.clone(), layer0.metadata()), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata()), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata()), ]; queue.queued_operations.extend(ops.clone()); @@ -989,15 +991,15 @@ mod tests { let index2 = index_with(&index1, &layer2); let ops = [ - UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer0.clone(), layer0.metadata()), UploadOp::UploadMetadata { uploaded: index0.clone(), }, - UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata()), UploadOp::UploadMetadata { uploaded: index1.clone(), }, - UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata()), UploadOp::UploadMetadata { uploaded: index2.clone(), }, @@ -1053,7 +1055,7 @@ mod tests { let ops = [ // Initial upload, with a barrier to prevent index coalescing. - UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::UploadLayer(layer.clone(), layer.metadata()), UploadOp::UploadMetadata { uploaded: index_upload.clone(), }, @@ -1099,7 +1101,7 @@ mod tests { let ops = [ // Initial upload, with a barrier to prevent index coalescing. - UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::UploadLayer(layer.clone(), layer.metadata()), UploadOp::UploadMetadata { uploaded: index_upload.clone(), }, @@ -1109,7 +1111,7 @@ mod tests { uploaded: index_deref.clone(), }, // Replace and reference the layer. - UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::UploadLayer(layer.clone(), layer.metadata()), UploadOp::UploadMetadata { uploaded: index_ref.clone(), }, @@ -1145,7 +1147,7 @@ mod tests { // Enqueue non-conflicting upload, delete, and index before and after a shutdown. let ops = [ - UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer0.clone(), layer0.metadata()), UploadOp::Delete(Delete { layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())], }), @@ -1153,7 +1155,7 @@ mod tests { uploaded: index.clone(), }, UploadOp::Shutdown, - UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata()), UploadOp::Delete(Delete { layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())], }), @@ -1203,10 +1205,10 @@ mod tests { let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); let ops = [ - UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), - UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), - UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), - UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None), + UploadOp::UploadLayer(layer0.clone(), layer0.metadata()), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata()), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata()), + UploadOp::UploadLayer(layer3.clone(), layer3.metadata()), ]; queue.queued_operations.extend(ops.clone()); @@ -1257,7 +1259,7 @@ mod tests { .layer_metadata .insert(layer.layer_desc().layer_name(), layer.metadata()); vec![ - UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), + UploadOp::UploadLayer(layer.clone(), layer.metadata()), UploadOp::Delete(Delete { layers: vec![(layer.layer_desc().layer_name(), layer.metadata())], }),