From 0112097e1321d16c4ff51dc4e69818aed395ec32 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 4 Jun 2024 17:27:08 +0300 Subject: [PATCH] feat(rtc): maintain dirty and uploaded IndexPart (#7833) RemoteTimelineClient maintains a copy of "next IndexPart" as a number of fields which are like an IndexPart but this is not immediately obvious. Instead of multiple fields, maintain a `dirty` ("next IndexPart") and `clean` ("uploaded IndexPart") fields. Additional cleanup: - rename `IndexPart::disk_consistent_lsn` accessor `duplicated_disk_consistent_lsn` - no one except scrubber should be looking at it, even scrubber is a stretch - remove usage elsewhere (pagectl used by tests, metadata scan endpoint) - serialize index part *before* the index upload operation - avoid upload operation being retried because of serialization error - serialization error is fatal anyway for timeline -- it can only make transient local progress after that, at least the error is bubbled up now - gather exploded IndexPart fields into single actual `UploadQueueInitialized::dirty` of which the uploaded snapshot is serialized - implement the long wished monotonicity check with the `clean` IndexPart with an assertion which is not expected to fire Continued work from #7860 towards next step of #6994. --- pageserver/ctl/src/index_part.rs | 2 +- pageserver/src/http/routes.rs | 2 +- .../src/tenant/remote_timeline_client.rs | 150 ++++++++++-------- .../tenant/remote_timeline_client/index.rs | 55 ++----- .../tenant/remote_timeline_client/upload.rs | 19 +-- pageserver/src/tenant/upload_queue.rs | 77 ++++----- s3_scrubber/src/checks.rs | 4 +- 7 files changed, 144 insertions(+), 165 deletions(-) diff --git a/pageserver/ctl/src/index_part.rs b/pageserver/ctl/src/index_part.rs index 2998b5c732..a33cae6769 100644 --- a/pageserver/ctl/src/index_part.rs +++ b/pageserver/ctl/src/index_part.rs @@ -26,7 +26,7 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> { let output = Output { layer_metadata: &des.layer_metadata, - disk_consistent_lsn: des.get_disk_consistent_lsn(), + disk_consistent_lsn: des.metadata.disk_consistent_lsn(), timeline_metadata: &des.metadata, }; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index bd6fa028ac..6b6a131c88 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2182,7 +2182,7 @@ async fn tenant_scan_remote_handler( { Ok((index_part, index_generation)) => { tracing::info!("Found timeline {tenant_shard_id}/{timeline_id} metadata (gen {index_generation:?}, {} layers, {} consistent LSN)", - index_part.layer_metadata.len(), index_part.get_disk_consistent_lsn()); + index_part.layer_metadata.len(), index_part.metadata.disk_consistent_lsn()); generation = std::cmp::max(generation, index_generation); } Err(DownloadError::NotFound) => { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 73438a790f..e33e4b84aa 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -91,8 +91,7 @@ //! //! The *actual* remote state lags behind the *desired* remote state while //! there are in-flight operations. -//! We keep track of the desired remote state in -//! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`]. +//! We keep track of the desired remote state in [`UploadQueueInitialized::dirty`]. //! It is initialized based on the [`IndexPart`] that was passed during init //! and updated with every `schedule_*` function call. //! All this is necessary necessary to compute the future [`IndexPart`]s @@ -115,8 +114,7 @@ //! //! # Completion //! -//! Once an operation has completed, we update -//! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately, +//! Once an operation has completed, we update [`UploadQueueInitialized::clean`] immediately, //! and submit a request through the DeletionQueue to update //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has //! validated that our generation is not stale. It is this visible value @@ -416,6 +414,7 @@ impl RemoteTimelineClient { Ok(()) } + /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise. pub fn remote_consistent_lsn_projected(&self) -> Option { match &mut *self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, @@ -442,13 +441,11 @@ impl RemoteTimelineClient { /// Returns true if this timeline was previously detached at this Lsn and the remote timeline /// client is currently initialized. pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool { - // technically this is a dirty read, but given how timeline detach ancestor is implemented - // via tenant restart, the lineage has always been uploaded. self.upload_queue .lock() .unwrap() .initialized_mut() - .map(|uq| uq.latest_lineage.is_previous_ancestor_lsn(lsn)) + .map(|uq| uq.clean.0.lineage.is_previous_ancestor_lsn(lsn)) .unwrap_or(false) } @@ -457,7 +454,6 @@ impl RemoteTimelineClient { current_remote_index_part .layer_metadata .values() - // If we don't have the file size for the layer, don't account for it in the metric. .map(|ilmd| ilmd.file_size) .sum() } else { @@ -585,9 +581,9 @@ impl RemoteTimelineClient { // As documented in the struct definition, it's ok for latest_metadata to be // ahead of what's _actually_ on the remote during index upload. - upload_queue.latest_metadata = metadata.clone(); + upload_queue.dirty.metadata = metadata.clone(); - self.schedule_index_upload(upload_queue); + self.schedule_index_upload(upload_queue)?; Ok(()) } @@ -606,9 +602,9 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - upload_queue.latest_metadata.apply(update); + upload_queue.dirty.metadata.apply(update); - self.schedule_index_upload(upload_queue); + self.schedule_index_upload(upload_queue)?; Ok(()) } @@ -620,8 +616,8 @@ impl RemoteTimelineClient { ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - upload_queue.last_aux_file_policy = last_aux_file_policy; - self.schedule_index_upload(upload_queue); + upload_queue.dirty.last_aux_file_policy = last_aux_file_policy; + self.schedule_index_upload(upload_queue)?; Ok(()) } /// @@ -639,30 +635,44 @@ impl RemoteTimelineClient { let upload_queue = guard.initialized_mut()?; if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 { - self.schedule_index_upload(upload_queue); + self.schedule_index_upload(upload_queue)?; } Ok(()) } /// Launch an index-file upload operation in the background (internal function) - fn schedule_index_upload(self: &Arc, upload_queue: &mut UploadQueueInitialized) { - let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn(); + fn schedule_index_upload( + self: &Arc, + upload_queue: &mut UploadQueueInitialized, + ) -> anyhow::Result<()> { + let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn(); + // fix up the duplicated field + upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn; + + // make sure it serializes before doing it in perform_upload_task so that it doesn't + // look like a retryable error + let void = std::io::sink(); + serde_json::to_writer(void, &upload_queue.dirty).context("serialize index_part.json")?; + + let index_part = &upload_queue.dirty; info!( "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)", - upload_queue.latest_files.len(), + index_part.layer_metadata.len(), upload_queue.latest_files_changes_since_metadata_upload_scheduled, ); - let index_part = IndexPart::from(&*upload_queue); - let op = UploadOp::UploadMetadata(Box::new(index_part), disk_consistent_lsn); + let op = UploadOp::UploadMetadata { + uploaded: Box::new(index_part.clone()), + }; self.metric_begin(&op); upload_queue.queued_operations.push_back(op); upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0; // Launch the task immediately, if possible self.launch_queued_tasks(upload_queue); + Ok(()) } pub(crate) async fn schedule_reparenting_and_wait( @@ -675,16 +685,16 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - let Some(prev) = upload_queue.latest_metadata.ancestor_timeline() else { + let Some(prev) = upload_queue.dirty.metadata.ancestor_timeline() else { return Err(anyhow::anyhow!( "cannot reparent without a current ancestor" )); }; - upload_queue.latest_metadata.reparent(new_parent); - upload_queue.latest_lineage.record_previous_ancestor(&prev); + upload_queue.dirty.metadata.reparent(new_parent); + upload_queue.dirty.lineage.record_previous_ancestor(&prev); - self.schedule_index_upload(upload_queue); + self.schedule_index_upload(upload_queue)?; self.schedule_barrier0(upload_queue) }; @@ -705,16 +715,17 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - upload_queue.latest_metadata.detach_from_ancestor(&adopted); - upload_queue.latest_lineage.record_detaching(&adopted); + upload_queue.dirty.metadata.detach_from_ancestor(&adopted); + upload_queue.dirty.lineage.record_detaching(&adopted); for layer in layers { upload_queue - .latest_files + .dirty + .layer_metadata .insert(layer.layer_desc().layer_name(), layer.metadata()); } - self.schedule_index_upload(upload_queue); + self.schedule_index_upload(upload_queue)?; let barrier = self.schedule_barrier0(upload_queue); self.launch_queued_tasks(upload_queue); @@ -746,7 +757,8 @@ impl RemoteTimelineClient { let metadata = layer.metadata(); upload_queue - .latest_files + .dirty + .layer_metadata .insert(layer.layer_desc().layer_name(), metadata.clone()); upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; @@ -776,8 +788,8 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - let with_metadata = - self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned()); + let with_metadata = self + .schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned())?; self.schedule_deletion_of_unlinked0(upload_queue, with_metadata); @@ -801,7 +813,7 @@ impl RemoteTimelineClient { let names = gc_layers.iter().map(|x| x.layer_desc().layer_name()); - self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names); + self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?; self.launch_queued_tasks(upload_queue); @@ -814,7 +826,7 @@ impl RemoteTimelineClient { self: &Arc, upload_queue: &mut UploadQueueInitialized, names: I, - ) -> Vec<(LayerName, LayerFileMetadata)> + ) -> anyhow::Result> where I: IntoIterator, { @@ -824,7 +836,7 @@ impl RemoteTimelineClient { let with_metadata: Vec<_> = names .into_iter() .filter_map(|name| { - let meta = upload_queue.latest_files.remove(&name); + let meta = upload_queue.dirty.layer_metadata.remove(&name); if let Some(meta) = meta { upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; @@ -856,10 +868,10 @@ impl RemoteTimelineClient { // index_part update, because that needs to be uploaded before we can actually delete the // files. if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 { - self.schedule_index_upload(upload_queue); + self.schedule_index_upload(upload_queue)?; } - with_metadata + Ok(with_metadata) } /// Schedules deletion for layer files which have previously been unlinked from the @@ -950,7 +962,7 @@ impl RemoteTimelineClient { let names = compacted_from.iter().map(|x| x.layer_desc().layer_name()); - self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names); + self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?; self.launch_queued_tasks(upload_queue); Ok(()) @@ -1085,7 +1097,7 @@ impl RemoteTimelineClient { let deleted_at = Utc::now().naive_utc(); stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at); - let mut index_part = IndexPart::from(&stopped.upload_queue_for_deletion); + let mut index_part = stopped.upload_queue_for_deletion.dirty.clone(); index_part.deleted_at = Some(deleted_at); index_part }; @@ -1296,7 +1308,8 @@ impl RemoteTimelineClient { stopped .upload_queue_for_deletion - .latest_files + .dirty + .layer_metadata .drain() .map(|(file_name, meta)| { remote_layer_path( @@ -1433,7 +1446,7 @@ impl RemoteTimelineClient { // Can always be scheduled. true } - UploadOp::UploadMetadata(_, _) => { + UploadOp::UploadMetadata { .. } => { // These can only be performed after all the preceding operations // have finished. upload_queue.inprogress_tasks.is_empty() @@ -1475,7 +1488,7 @@ impl RemoteTimelineClient { UploadOp::UploadLayer(_, _) => { upload_queue.num_inprogress_layer_uploads += 1; } - UploadOp::UploadMetadata(_, _) => { + UploadOp::UploadMetadata { .. } => { upload_queue.num_inprogress_metadata_uploads += 1; } UploadOp::Delete(_) => { @@ -1584,22 +1597,13 @@ impl RemoteTimelineClient { ) .await } - UploadOp::UploadMetadata(ref index_part, _lsn) => { - let mention_having_future_layers = if cfg!(feature = "testing") { - index_part - .layer_metadata - .keys() - .any(|x| x.is_in_future(*_lsn)) - } else { - false - }; - + UploadOp::UploadMetadata { ref uploaded } => { let res = upload::upload_index_part( &self.storage_impl, &self.tenant_shard_id, &self.timeline_id, self.generation, - index_part, + uploaded, &self.cancel, ) .measure_remote_op( @@ -1609,10 +1613,21 @@ impl RemoteTimelineClient { ) .await; if res.is_ok() { - self.update_remote_physical_size_gauge(Some(index_part)); + self.update_remote_physical_size_gauge(Some(uploaded)); + let mention_having_future_layers = if cfg!(feature = "testing") { + uploaded + .layer_metadata + .keys() + .any(|x| x.is_in_future(uploaded.metadata.disk_consistent_lsn())) + } else { + false + }; if mention_having_future_layers { // find rationale near crate::tenant::timeline::init::cleanup_future_layer - tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup"); + tracing::info!( + disk_consistent_lsn = %uploaded.metadata.disk_consistent_lsn(), + "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup" + ); } } res @@ -1713,11 +1728,23 @@ impl RemoteTimelineClient { upload_queue.num_inprogress_layer_uploads -= 1; None } - UploadOp::UploadMetadata(_, lsn) => { + UploadOp::UploadMetadata { ref uploaded } => { upload_queue.num_inprogress_metadata_uploads -= 1; - // XXX monotonicity check? - upload_queue.projected_remote_consistent_lsn = Some(lsn); + // the task id is reused as a monotonicity check for storing the "clean" + // IndexPart. + let last_updater = upload_queue.clean.1; + let is_later = last_updater.is_some_and(|task_id| task_id < task.task_id); + let monotone = is_later || last_updater.is_none(); + + assert!(monotone, "no two index uploads should be completing at the same time, prev={last_updater:?}, task.task_id={}", task.task_id); + + // not taking ownership is wasteful + upload_queue.clean.0.clone_from(uploaded); + upload_queue.clean.1 = Some(task.task_id); + + let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn(); + if self.generation.is_none() { // Legacy mode: skip validating generation upload_queue.visible_remote_consistent_lsn.store(lsn); @@ -1771,7 +1798,7 @@ impl RemoteTimelineClient { RemoteOpKind::Upload, RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size), ), - UploadOp::UploadMetadata(_, _) => ( + UploadOp::UploadMetadata { .. } => ( RemoteOpFileKind::Index, RemoteOpKind::Upload, DontTrackSize { @@ -1847,11 +1874,9 @@ impl RemoteTimelineClient { // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it. let upload_queue_for_deletion = UploadQueueInitialized { task_counter: 0, - latest_files: initialized.latest_files.clone(), + dirty: initialized.dirty.clone(), + clean: initialized.clean.clone(), latest_files_changes_since_metadata_upload_scheduled: 0, - latest_metadata: initialized.latest_metadata.clone(), - latest_lineage: initialized.latest_lineage.clone(), - projected_remote_consistent_lsn: None, visible_remote_consistent_lsn: initialized .visible_remote_consistent_lsn .clone(), @@ -1864,7 +1889,6 @@ impl RemoteTimelineClient { dangling_files: HashMap::default(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), - last_aux_file_policy: initialized.last_aux_file_policy, }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index f5d939c747..6494261312 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -11,7 +11,6 @@ use utils::id::TimelineId; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::storage_layer::LayerName; -use crate::tenant::upload_queue::UploadQueueInitialized; use crate::tenant::Generation; use pageserver_api::shard::ShardIndex; @@ -42,7 +41,7 @@ pub struct IndexPart { // 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata. // It's duplicated for convenience when reading the serialized structure, but is // private because internally we would read from metadata instead. - disk_consistent_lsn: Lsn, + pub(super) disk_consistent_lsn: Lsn, #[serde(rename = "metadata_bytes")] pub metadata: TimelineMetadata, @@ -80,23 +79,15 @@ impl IndexPart { pub const FILE_NAME: &'static str = "index_part.json"; - fn new( - layers_and_metadata: &HashMap, - disk_consistent_lsn: Lsn, - metadata: TimelineMetadata, - lineage: Lineage, - last_aux_file_policy: Option, - ) -> Self { - let layer_metadata = layers_and_metadata.clone(); - - Self { + pub(crate) fn empty(metadata: TimelineMetadata) -> Self { + IndexPart { version: Self::LATEST_VERSION, - layer_metadata, - disk_consistent_lsn, + layer_metadata: Default::default(), + disk_consistent_lsn: metadata.disk_consistent_lsn(), metadata, deleted_at: None, - lineage, - last_aux_file_policy, + lineage: Default::default(), + last_aux_file_policy: None, } } @@ -106,7 +97,7 @@ impl IndexPart { /// If you want this under normal operations, read it from self.metadata: /// this method is just for the scrubber to use when validating an index. - pub fn get_disk_consistent_lsn(&self) -> Lsn { + pub fn duplicated_disk_consistent_lsn(&self) -> Lsn { self.disk_consistent_lsn } @@ -120,14 +111,7 @@ impl IndexPart { #[cfg(test)] pub(crate) fn example() -> Self { - let example_metadata = TimelineMetadata::example(); - Self::new( - &HashMap::new(), - example_metadata.disk_consistent_lsn(), - example_metadata, - Default::default(), - Some(AuxFilePolicy::V1), - ) + Self::empty(TimelineMetadata::example()) } pub(crate) fn last_aux_file_policy(&self) -> Option { @@ -135,22 +119,6 @@ impl IndexPart { } } -impl From<&UploadQueueInitialized> for IndexPart { - fn from(uq: &UploadQueueInitialized) -> Self { - let disk_consistent_lsn = uq.latest_metadata.disk_consistent_lsn(); - let metadata = uq.latest_metadata.clone(); - let lineage = uq.latest_lineage.clone(); - - Self::new( - &uq.latest_files, - disk_consistent_lsn, - metadata, - lineage, - uq.last_aux_file_policy, - ) - } -} - /// Metadata gathered for each of the layer files. /// /// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which @@ -236,11 +204,10 @@ impl Lineage { /// The queried lsn is most likely the basebackup lsn, and this answers question "is it allowed /// to start a read/write primary at this lsn". /// - /// Returns true if the Lsn was previously a branch point. + /// Returns true if the Lsn was previously our branch point. pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool { self.original_ancestor - .as_ref() - .is_some_and(|(_, ancestor_lsn, _)| lsn == *ancestor_lsn) + .is_some_and(|(_, ancestor_lsn, _)| ancestor_lsn == lsn) } } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index e8e824f415..c4dd184610 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -1,6 +1,7 @@ //! Helper functions to upload files to remote storage with a RemoteStorage use anyhow::{bail, Context}; +use bytes::Bytes; use camino::Utf8Path; use fail::fail_point; use pageserver_api::shard::TenantShardId; @@ -11,10 +12,10 @@ use tokio::io::AsyncSeekExt; use tokio_util::sync::CancellationToken; use utils::{backoff, pausable_failpoint}; +use super::index::IndexPart; use super::Generation; use crate::tenant::remote_timeline_client::{ - index::IndexPart, remote_index_path, remote_initdb_archive_path, - remote_initdb_preserved_archive_path, + remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path, }; use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError}; use utils::id::{TenantId, TimelineId}; @@ -27,7 +28,7 @@ pub(crate) async fn upload_index_part<'a>( tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, generation: Generation, - index_part: &'a IndexPart, + index_part: &IndexPart, cancel: &CancellationToken, ) -> anyhow::Result<()> { tracing::trace!("uploading new index part"); @@ -37,16 +38,16 @@ pub(crate) async fn upload_index_part<'a>( }); pausable_failpoint!("before-upload-index-pausable"); - let index_part_bytes = index_part - .to_s3_bytes() - .context("serialize index part file into bytes")?; - let index_part_size = index_part_bytes.len(); - let index_part_bytes = bytes::Bytes::from(index_part_bytes); + // FIXME: this error comes too late + let serialized = index_part.to_s3_bytes()?; + let serialized = Bytes::from(serialized); + + let index_part_size = serialized.len(); let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation); storage .upload_storage_object( - futures::stream::once(futures::future::ready(Ok(index_part_bytes))), + futures::stream::once(futures::future::ready(Ok(serialized))), index_part_size, &remote_path, cancel, diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 02f87303d1..50c977a950 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -3,12 +3,10 @@ use super::storage_layer::ResidentLayer; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; -use crate::tenant::remote_timeline_client::index::Lineage; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use chrono::NaiveDateTime; -use pageserver_api::models::AuxFilePolicy; use std::sync::Arc; use tracing::info; use utils::lsn::AtomicLsn; @@ -45,34 +43,25 @@ pub(crate) struct UploadQueueInitialized { /// Counter to assign task IDs pub(crate) task_counter: u64, - /// All layer files stored in the remote storage, taking into account all - /// in-progress and queued operations - pub(crate) latest_files: HashMap, + /// The next uploaded index_part.json; assumed to be dirty. + /// + /// Should not be read, directly except for layer file updates. Instead you should add a + /// projected field. + pub(crate) dirty: IndexPart, + + /// The latest remote persisted IndexPart. + /// + /// Each completed metadata upload will update this. The second item is the task_id which last + /// updated the value, used to ensure we never store an older value over a newer one. + pub(crate) clean: (IndexPart, Option), /// How many file uploads or deletions been scheduled, since the /// last (scheduling of) metadata index upload? pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64, - /// Metadata stored in the remote storage, taking into account all - /// in-progress and queued operations. - /// DANGER: do not return to outside world, e.g., safekeepers. - pub(crate) latest_metadata: TimelineMetadata, - - /// Part of the flattened "next" `index_part.json`. - pub(crate) latest_lineage: Lineage, - - /// The last aux file policy used on this timeline. - pub(crate) last_aux_file_policy: Option, - - /// `disk_consistent_lsn` from the last metadata file that was successfully - /// uploaded. `Lsn(0)` if nothing was uploaded yet. - /// Unlike `latest_files` or `latest_metadata`, this value is never ahead. - /// Safekeeper can rely on it to make decisions for WAL storage. - /// - /// visible_remote_consistent_lsn is only updated after our generation has been validated with + /// The Lsn is only updated after our generation has been validated with /// the control plane (unlesss a timeline's generation is None, in which case /// we skip validation) - pub(crate) projected_remote_consistent_lsn: Option, pub(crate) visible_remote_consistent_lsn: Arc, // Breakdown of different kinds of tasks currently in-progress @@ -118,7 +107,8 @@ impl UploadQueueInitialized { } pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option { - self.projected_remote_consistent_lsn + let lsn = self.clean.0.metadata.disk_consistent_lsn(); + self.clean.1.map(|_| lsn) } } @@ -174,13 +164,12 @@ impl UploadQueue { info!("initializing upload queue for empty remote"); + let index_part = IndexPart::empty(metadata.clone()); + let state = UploadQueueInitialized { - // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead. - latest_files: HashMap::new(), + dirty: index_part.clone(), + clean: (index_part, None), latest_files_changes_since_metadata_upload_scheduled: 0, - latest_metadata: metadata.clone(), - latest_lineage: Lineage::default(), - projected_remote_consistent_lsn: None, visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)), // what follows are boring default initializations task_counter: 0, @@ -193,7 +182,6 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), - last_aux_file_policy: Default::default(), }; *self = UploadQueue::Initialized(state); @@ -211,22 +199,15 @@ impl UploadQueue { } } - let mut files = HashMap::with_capacity(index_part.layer_metadata.len()); - for (layer_name, layer_metadata) in &index_part.layer_metadata { - files.insert(layer_name.to_owned(), layer_metadata.clone()); - } - info!( "initializing upload queue with remote index_part.disk_consistent_lsn: {}", index_part.metadata.disk_consistent_lsn() ); let state = UploadQueueInitialized { - latest_files: files, + dirty: index_part.clone(), + clean: (index_part.clone(), None), latest_files_changes_since_metadata_upload_scheduled: 0, - latest_metadata: index_part.metadata.clone(), - latest_lineage: index_part.lineage.clone(), - projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()), visible_remote_consistent_lsn: Arc::new( index_part.metadata.disk_consistent_lsn().into(), ), @@ -241,7 +222,6 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), - last_aux_file_policy: index_part.last_aux_file_policy(), }; *self = UploadQueue::Initialized(state); @@ -298,13 +278,16 @@ pub(crate) enum UploadOp { /// Upload a layer file UploadLayer(ResidentLayer, LayerFileMetadata), - /// Upload the metadata file - UploadMetadata(Box, Lsn), + /// Upload a index_part.json file + UploadMetadata { + /// The next [`UploadQueueInitialized::clean`] after this upload succeeds. + uploaded: Box, + }, /// Delete layer files Delete(Delete), - /// Barrier. When the barrier operation is reached, + /// Barrier. When the barrier operation is reached, the channel is closed. Barrier(tokio::sync::watch::Sender<()>), /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise @@ -322,8 +305,12 @@ impl std::fmt::Display for UploadOp { layer, metadata.file_size, metadata.generation ) } - UploadOp::UploadMetadata(_, lsn) => { - write!(f, "UploadMetadata(lsn: {})", lsn) + UploadOp::UploadMetadata { uploaded, .. } => { + write!( + f, + "UploadMetadata(lsn: {})", + uploaded.metadata.disk_consistent_lsn() + ) } UploadOp::Delete(delete) => { write!(f, "Delete({} layers)", delete.layers.len()) diff --git a/s3_scrubber/src/checks.rs b/s3_scrubber/src/checks.rs index 2c14fef0af..44fb53696c 100644 --- a/s3_scrubber/src/checks.rs +++ b/s3_scrubber/src/checks.rs @@ -93,12 +93,12 @@ pub(crate) fn branch_cleanup_and_check_errors( } if index_part.metadata.disk_consistent_lsn() - != index_part.get_disk_consistent_lsn() + != index_part.duplicated_disk_consistent_lsn() { result.errors.push(format!( "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})", index_part.metadata.disk_consistent_lsn(), - index_part.get_disk_consistent_lsn(), + index_part.duplicated_disk_consistent_lsn(), )) }