From 71bc45a21b5d55dfd2541f17038dbaaf68318a1b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 16 Nov 2022 17:23:54 -0500 Subject: [PATCH] storage_sync: track upload queue initialization state using enum & fix last_uploaded_consistent_lsn initialization for empty remote storage As pointed out in https://github.com/neondatabase/neon/pull/2785/files/b8488e70a9497d41d394af375c04cfe21c7871c5#r1024319620 the following is wrong for the case where the remote storage is empty: metadata = whatever the local-ONLY metadata is ... upload_queue.latest_metadata = Some(metadata.clone()); upload_queue.last_uploaded_consistent_lsn = Some(metadata.disk_consistent_lsn()); The reason why it's wrong is that we return last_uploaded_consistent_lsn to safekeepers. So, we'd be returning an Lsn that is not yet uploaded to S3. --- pageserver/src/storage_sync.rs | 188 ++++++++++++++++++------------ pageserver/src/tenant.rs | 2 +- pageserver/src/tenant/timeline.rs | 4 +- 3 files changed, 114 insertions(+), 80 deletions(-) diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 3e3255e82a..27808d723a 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -127,6 +127,7 @@ mod download; pub mod index; mod upload; +use anyhow::Context; // re-export this pub use download::is_temp_download_file; pub use download::list_remote_timelines; @@ -135,7 +136,7 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex}; use anyhow::ensure; use remote_storage::{DownloadError, GenericRemoteStorage}; @@ -189,10 +190,13 @@ pub struct RemoteTimelineClient { storage_impl: GenericRemoteStorage, } - +enum UploadQueue { + Uninitialized, + Initialized(UploadQueueInitialized), +} /// This keeps track of queued and in-progress tasks. #[derive(Default)] -struct UploadQueue { +struct UploadQueueInitialized { /// Counter to assign task IDs task_counter: u64, @@ -226,6 +230,66 @@ struct UploadQueue { queued_operations: VecDeque, } +impl UploadQueue { + fn initialize_empty_remote(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { + match self { + UploadQueue::Uninitialized => (), + UploadQueue::Initialized(_) => anyhow::bail!("already initialized"), + } + + info!("initializing upload queue for new timeline"); + + let state = UploadQueueInitialized::default(); + + *self = UploadQueue::Initialized(state); + Ok(self.initialized_mut().expect("we just set it")) + } + + fn initialize_with_current_remote_index_part( + &mut self, + index_part: &IndexPart, + ) -> anyhow::Result<&mut UploadQueueInitialized> { + match self { + UploadQueue::Uninitialized => (), + UploadQueue::Initialized(_) => anyhow::bail!("already initialized"), + } + + let mut files = HashMap::new(); + for path in &index_part.timeline_layers { + let layer_metadata = index_part + .layer_metadata + .get(path) + .map(LayerFileMetadata::from) + .unwrap_or(LayerFileMetadata::MISSING); + files.insert(path.clone(), layer_metadata); + } + + let metadata = index_part.parse_metadata()?; + + info!( + "initializing upload queue with disk_consistent_lsn: {}", + metadata.disk_consistent_lsn() + ); + + let state = UploadQueueInitialized { + latest_files: files, + latest_metadata: Some(metadata.clone()), + last_uploaded_consistent_lsn: Some(metadata.disk_consistent_lsn()), + ..Default::default() + }; + + *self = UploadQueue::Initialized(state); + Ok(self.initialized_mut().expect("we just set it")) + } + + fn initialized_mut(&mut self) -> Option<&mut UploadQueueInitialized> { + match self { + UploadQueue::Uninitialized => None, + UploadQueue::Initialized(x) => Some(x), + } + } +} + /// An in-progress upload or delete task. #[derive(Debug)] struct UploadTask { @@ -273,61 +337,26 @@ impl RemoteTimelineClient { /// /// 'index_part' is the current remote metadata file. pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> { - let mut files = HashMap::new(); - for path in &index_part.timeline_layers { - let layer_metadata = index_part - .layer_metadata - .get(path) - .map(LayerFileMetadata::from) - .unwrap_or(LayerFileMetadata::MISSING); - files.insert(path.clone(), layer_metadata); - } - - let metadata = index_part.parse_metadata()?; - - info!( - "initializing upload queue with disk_consistent_lsn: {}", - metadata.disk_consistent_lsn() - ); let mut upload_queue = self.upload_queue.lock().unwrap(); - if upload_queue.latest_metadata.is_some() - || upload_queue.last_uploaded_consistent_lsn.is_some() - { - anyhow::bail!("attempted to initialize already initialized upload queue") - } - - upload_queue.latest_files = files; - upload_queue.latest_metadata = Some(metadata.clone()); - upload_queue.last_uploaded_consistent_lsn = Some(metadata.disk_consistent_lsn()); - + upload_queue.initialize_with_current_remote_index_part(index_part)?; Ok(()) } /// Like init_upload_queue, but the list of files is initially empty, and /// metadata is passed as a TimelineMetadata. This is used when creating a /// new timeline. - pub fn init_upload_queue_empty(&self, metadata: &TimelineMetadata) -> anyhow::Result<()> { - info!( - "initializing upload queue for new timeline with disk_consistent_lsn: {}", - metadata.disk_consistent_lsn() - ); + pub fn init_upload_queue_empty(&self) -> anyhow::Result<()> { + info!("initializing upload queue for new timeline"); let mut upload_queue = self.upload_queue.lock().unwrap(); - if upload_queue.latest_metadata.is_some() - || upload_queue.last_uploaded_consistent_lsn.is_some() - { - anyhow::bail!("attempted to initialize already initialized upload queue") - } - - upload_queue.latest_metadata = Some(metadata.clone()); - upload_queue.last_uploaded_consistent_lsn = Some(metadata.disk_consistent_lsn()); + upload_queue.initialize_empty_remote()?; Ok(()) } pub fn last_uploaded_consistent_lsn(&self) -> Option { - let upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.last_uploaded_consistent_lsn + let mut upload_queue = self.upload_queue.lock().unwrap(); + upload_queue.initialized_mut()?.last_uploaded_consistent_lsn } // @@ -371,7 +400,10 @@ impl RemoteTimelineClient { // to fill in the missing details. if layer_metadata.file_size().is_none() { let new_metadata = LayerFileMetadata::new(downloaded_size); - let mut upload_queue = self.upload_queue.lock().unwrap(); + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard + .initialized_mut() + .context("upload queue is not initialized")?; if let Some(upgraded) = upload_queue.latest_files.get_mut(path) { upgraded.merge(&new_metadata); } else { @@ -401,12 +433,10 @@ impl RemoteTimelineClient { self: &Arc, metadata: &TimelineMetadata, ) -> anyhow::Result<()> { - let mut upload_queue = self.upload_queue.lock().unwrap(); - ensure!( - upload_queue.latest_metadata.is_some(), - "upload queue not initialized" - ); - + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard + .initialized_mut() + .context("upload queue is not initialized")?; upload_queue.latest_metadata = Some(metadata.clone()); let disk_consistent_lsn = upload_queue @@ -443,11 +473,11 @@ impl RemoteTimelineClient { path: &Path, layer_metadata: &LayerFileMetadata, ) -> anyhow::Result<()> { - let mut upload_queue = self.upload_queue.lock().unwrap(); - ensure!( - upload_queue.last_uploaded_consistent_lsn.is_some(), - "upload queue not initialized" - ); + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard + .initialized_mut() + .context("upload queue is not initialized")?; + // The file size can be missing for files that were created before we tracked that // in the metadata, but it should be present for any new files we create. ensure!( @@ -484,11 +514,10 @@ impl RemoteTimelineClient { /// The deletion won't actually be performed, until all preceding /// upload operations have completed succesfully. pub fn schedule_layer_file_deletion(self: &Arc, paths: &[PathBuf]) -> anyhow::Result<()> { - let mut upload_queue = self.upload_queue.lock().unwrap(); - ensure!( - upload_queue.latest_metadata.is_some(), - "upload queue not initialized" - ); + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard + .initialized_mut() + .context("upload queue is not initialized")?; // Update the remote index file, removing the to-be-deleted files from the index, // before deleting the actual files. @@ -534,14 +563,11 @@ impl RemoteTimelineClient { let barrier_op = UploadOp::Barrier(sender); { - let mut upload_queue = self.upload_queue.lock().unwrap(); - ensure!( - upload_queue.latest_metadata.is_some(), - "upload queue not initialized" - ); - + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard + .initialized_mut() + .context("upload queue is not initialized")?; upload_queue.queued_operations.push_back(barrier_op); - // Launch the task immediately, if possible self.launch_queued_tasks(upload_queue); } @@ -555,7 +581,7 @@ impl RemoteTimelineClient { /// the ordering constraints. /// /// The caller needs to already hold the `upload_queue` lock. - fn launch_queued_tasks(self: &Arc, mut upload_queue: MutexGuard) { + fn launch_queued_tasks(self: &Arc, upload_queue: &mut UploadQueueInitialized) { while let Some(next_op) = upload_queue.queued_operations.front() { // Can we run this task now? let can_run_now = match next_op { @@ -710,6 +736,9 @@ impl RemoteTimelineClient { // The task has completed succesfully. Remove it from the in-progress list. { let mut upload_queue = self.upload_queue.lock().unwrap(); + let mut upload_queue = upload_queue.initialized_mut().expect( + "callers are responsible for ensuring this is only called on initialized queue", + ); upload_queue.inprogress_tasks.remove(&task.task_id); match task.op { @@ -750,7 +779,7 @@ pub fn create_remote_timeline_client( tenant_id, timeline_id, storage_impl: remote_storage, - upload_queue: Mutex::new(UploadQueue::default()), + upload_queue: Mutex::new(UploadQueue::Uninitialized), }) } @@ -867,15 +896,14 @@ mod tests { tenant_id: harness.tenant_id, timeline_id: TIMELINE_ID, storage_impl, - upload_queue: Mutex::new(UploadQueue::default()), + upload_queue: Mutex::new(UploadQueue::Uninitialized), }); let remote_timeline_dir = remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?); println!("remote_timeline_dir: {}", remote_timeline_dir.display()); - let metadata = dummy_metadata(Lsn(0x10)); - client.init_upload_queue_empty(&metadata)?; + client.init_upload_queue_empty()?; // Create a couple of dummy files, schedule upload for them let content_foo = dummy_contents("foo"); @@ -894,7 +922,8 @@ mod tests { // Check that they are started immediately, not queued { - let upload_queue = client.upload_queue.lock().unwrap(); + let mut guard = client.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut().unwrap(); assert!(upload_queue.queued_operations.is_empty()); assert!(upload_queue.inprogress_tasks.len() == 2); assert!(upload_queue.num_inprogress_layer_uploads == 2); @@ -904,14 +933,17 @@ mod tests { let metadata = dummy_metadata(Lsn(0x20)); client.schedule_index_upload(&metadata)?; { - let upload_queue = client.upload_queue.lock().unwrap(); + let mut guard = client.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut().unwrap(); assert!(upload_queue.queued_operations.len() == 1); } // Wait for the uploads to finish runtime.block_on(client.wait_completion())?; { - let upload_queue = client.upload_queue.lock().unwrap(); + let mut guard = client.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut().unwrap(); + assert!(upload_queue.queued_operations.is_empty()); assert!(upload_queue.inprogress_tasks.is_empty()); } @@ -931,7 +963,9 @@ mod tests { )?; client.schedule_layer_file_deletion(&[timeline_path.join("foo")])?; { - let upload_queue = client.upload_queue.lock().unwrap(); + let mut guard = client.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut().unwrap(); + // Deletion schedules upload of the index file, and the file deletion itself assert!(upload_queue.queued_operations.len() == 2); assert!(upload_queue.inprogress_tasks.len() == 1); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ae048bd960..9e7abaa124 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1865,7 +1865,7 @@ impl Tenant { tenant_id, new_timeline_id, )?; - remote_client.init_upload_queue_empty(&new_metadata)?; + remote_client.init_upload_queue_empty()?; Some(remote_client) } else { None diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 04c2934a15..1511f2fe8f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1198,6 +1198,7 @@ impl Timeline { }; let (local_only_filenames, up_to_date_metadata) = match index_part { Ok(index_part) => { + remote_client.init_upload_queue(&index_part)?; let (local_only_filenames, up_to_date_metadata) = self .download_missing( &index_part, @@ -1207,12 +1208,11 @@ impl Timeline { first_save, ) .await?; - remote_client.init_upload_queue(&index_part)?; (local_only_filenames, up_to_date_metadata) } Err(DownloadError::NotFound) => { info!("no index file was found on the remote"); - remote_client.init_upload_queue_empty(&local_metadata)?; + remote_client.init_upload_queue_empty()?; (local_filenames, local_metadata) } Err(e) => return Err(anyhow::anyhow!(e)),