From 172c7e5f92d03f3a78265771b465ad28b7615e43 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Wed, 28 Dec 2022 15:12:06 +0200 Subject: [PATCH] Split upload queue code from storage_sync.rs (#3216) https://github.com/neondatabase/neon/issues/3208 --- pageserver/src/lib.rs | 1 - pageserver/src/tenant.rs | 22 +- ...rage_sync.rs => remote_timeline_client.rs} | 259 ++---------------- .../delete.rs | 0 .../download.rs | 0 .../index.rs | 0 .../upload.rs | 2 +- .../src/tenant/storage_layer/remote_layer.rs | 2 +- pageserver/src/tenant/timeline.rs | 8 +- pageserver/src/tenant/upload_queue.rs | 213 ++++++++++++++ 10 files changed, 262 insertions(+), 245 deletions(-) rename pageserver/src/tenant/{storage_sync.rs => remote_timeline_client.rs} (85%) rename pageserver/src/tenant/{storage_sync => remote_timeline_client}/delete.rs (100%) rename pageserver/src/tenant/{storage_sync => remote_timeline_client}/download.rs (100%) rename pageserver/src/tenant/{storage_sync => remote_timeline_client}/index.rs (100%) rename pageserver/src/tenant/{storage_sync => remote_timeline_client}/upload.rs (97%) create mode 100644 pageserver/src/tenant/upload_queue.rs diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 80b05a76a6..29050a5bc2 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -13,7 +13,6 @@ pub mod profiling; pub mod repository; pub mod task_mgr; pub mod tenant; - pub mod trace; pub mod virtual_file; pub mod walingest; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index eb28e6da0a..4c93490177 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -45,9 +45,7 @@ use std::sync::{Mutex, RwLock}; use std::time::{Duration, Instant}; use self::metadata::TimelineMetadata; -use self::storage_sync::create_remote_timeline_client; -use self::storage_sync::index::IndexPart; -use self::storage_sync::RemoteTimelineClient; +use self::remote_timeline_client::RemoteTimelineClient; use crate::config::PageServerConf; use crate::import_datadir; use crate::is_uninit_mark; @@ -57,6 +55,7 @@ use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; use crate::tenant::metadata::load_metadata; +use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::storage_layer::DeltaLayer; use crate::tenant::storage_layer::ImageLayer; use crate::tenant::storage_layer::Layer; @@ -82,12 +81,13 @@ pub mod layer_map; pub mod metadata; mod par_fsync; +mod remote_timeline_client; pub mod storage_layer; -mod storage_sync; pub mod config; pub mod mgr; pub mod tasks; +pub mod upload_queue; mod timeline; @@ -648,8 +648,12 @@ impl Tenant { .as_ref() .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; - let remote_timelines = - storage_sync::list_remote_timelines(remote_storage, self.conf, self.tenant_id).await?; + let remote_timelines = remote_timeline_client::list_remote_timelines( + remote_storage, + self.conf, + self.tenant_id, + ) + .await?; info!("found {} timelines", remote_timelines.len()); @@ -733,7 +737,7 @@ impl Tenant { .context("Failed to create new timeline directory")?; let remote_client = - create_remote_timeline_client(remote_storage, self.conf, self.tenant_id, timeline_id)?; + RemoteTimelineClient::new(remote_storage, self.conf, self.tenant_id, timeline_id)?; let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() { let timelines = self.timelines.lock().unwrap(); @@ -995,7 +999,7 @@ impl Tenant { .remote_storage .as_ref() .map(|remote_storage| { - create_remote_timeline_client( + RemoteTimelineClient::new( remote_storage.clone(), self.conf, self.tenant_id, @@ -2192,7 +2196,7 @@ impl Tenant { let tenant_id = self.tenant_id; let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() { - let remote_client = create_remote_timeline_client( + let remote_client = RemoteTimelineClient::new( remote_storage.clone(), self.conf, tenant_id, diff --git a/pageserver/src/tenant/storage_sync.rs b/pageserver/src/tenant/remote_timeline_client.rs similarity index 85% rename from pageserver/src/tenant/storage_sync.rs rename to pageserver/src/tenant/remote_timeline_client.rs index ef57f91a02..e27b0a8133 100644 --- a/pageserver/src/tenant/storage_sync.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -58,7 +58,7 @@ //! To have a consistent remote structure, it's important that uploads and //! deletions are performed in the right order. For example, the index file //! contains a list of layer files, so it must not be uploaded until all the -//! layer files that are in its list have been succesfully uploaded. +//! layer files that are in its list have been successfully uploaded. //! //! The contract between client and its user is that the user is responsible of //! scheduling operations in an order that keeps the remote consistent as @@ -140,7 +140,7 @@ //! Note that if we crash during file deletion between the index update //! that removes the file from the list of files, and deleting the remote file, //! the file is leaked in the remote storage. Similarly, if a new file is created -//! and uploaded, but the pageserver dies permantently before updating the +//! and uploaded, but the pageserver dies permanently before updating the //! remote index file, the new file is leaked in remote storage. We accept and //! tolerate that for now. //! Note further that we cannot easily fix this by scheduling deletes for every @@ -207,30 +207,30 @@ mod upload; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; -use std::collections::{HashMap, VecDeque}; -use std::fmt::Debug; -use std::ops::DerefMut; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use anyhow::ensure; use remote_storage::{DownloadError, GenericRemoteStorage}; +use std::ops::DerefMut; use tokio::runtime::Runtime; use tracing::{info, warn}; use tracing::{info_span, Instrument}; - use utils::lsn::Lsn; use crate::metrics::RemoteOpFileKind; use crate::metrics::RemoteOpKind; use crate::metrics::{MeasureRemoteOp, RemoteTimelineClientMetrics}; -use crate::tenant::storage_sync::index::LayerFileMetadata; +use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::{ config::PageServerConf, task_mgr, task_mgr::TaskKind, task_mgr::BACKGROUND_RUNTIME, tenant::metadata::TimelineMetadata, + tenant::upload_queue::{ + UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask, + }, {exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS}, }; @@ -286,206 +286,30 @@ pub struct RemoteTimelineClient { storage_impl: GenericRemoteStorage, } -// clippy warns that Uninitialized is much smaller than Initialized, which wastes -// memory for Uninitialized variants. Doesn't matter in practice, there are not -// that many upload queues in a running pageserver, and most of them are initialized -// anyway. -#[allow(clippy::large_enum_variant)] -enum UploadQueue { - Uninitialized, - Initialized(UploadQueueInitialized), - Stopped(UploadQueueStopped), -} - -impl UploadQueue { - fn as_str(&self) -> &'static str { - match self { - UploadQueue::Uninitialized => "Uninitialized", - UploadQueue::Initialized(_) => "Initialized", - UploadQueue::Stopped(_) => "Stopped", - } - } -} - -/// This keeps track of queued and in-progress tasks. -struct UploadQueueInitialized { - /// Counter to assign task IDs - task_counter: u64, - - /// All layer files stored in the remote storage, taking into account all - /// in-progress and queued operations - latest_files: HashMap, - - /// How many file uploads or deletions been scheduled, since the - /// last (scheduling of) metadata index upload? - latest_files_changes_since_metadata_upload_scheduled: u64, - - /// Metadata stored in the remote storage, taking into account all - /// in-progress and queued operations. - /// DANGER: do not return to outside world, e.g., safekeepers. - latest_metadata: TimelineMetadata, - - /// `disk_consistent_lsn` from the last metadata file that was successfully - /// uploaded. `Lsn(0)` if nothing was uploaded yet. - /// Unlike `latest_files` or `latest_metadata`, this value is never ahead. - /// Safekeeper can rely on it to make decisions for WAL storage. - last_uploaded_consistent_lsn: Lsn, - - // Breakdown of different kinds of tasks currently in-progress - num_inprogress_layer_uploads: usize, - num_inprogress_metadata_uploads: usize, - num_inprogress_deletions: usize, - - /// Tasks that are currently in-progress. In-progress means that a tokio Task - /// has been launched for it. An in-progress task can be busy uploading, but it can - /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can - /// be waiting for retry in `exponential_backoff`. - inprogress_tasks: HashMap>, - - /// Queued operations that have not been launched yet. They might depend on previous - /// tasks to finish. For example, metadata upload cannot be performed before all - /// preceding layer file uploads have completed. - queued_operations: VecDeque, -} - -struct UploadQueueStopped { - last_uploaded_consistent_lsn: Lsn, -} - -impl UploadQueue { - fn initialize_empty_remote( - &mut self, - metadata: &TimelineMetadata, - ) -> anyhow::Result<&mut UploadQueueInitialized> { - match self { - UploadQueue::Uninitialized => (), - UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { - anyhow::bail!("already initialized, state {}", self.as_str()) - } - } - - info!("initializing upload queue for empty remote"); - - let state = UploadQueueInitialized { - // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead. - latest_files: HashMap::new(), - latest_files_changes_since_metadata_upload_scheduled: 0, - latest_metadata: metadata.clone(), - // We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent - // safekeepers from garbage-collecting anything. - last_uploaded_consistent_lsn: Lsn(0), - // what follows are boring default initializations - task_counter: 0, - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, - inprogress_tasks: HashMap::new(), - queued_operations: VecDeque::new(), - }; - - *self = UploadQueue::Initialized(state); - Ok(self.initialized_mut().expect("we just set it")) - } - - fn initialize_with_current_remote_index_part( - &mut self, - index_part: &IndexPart, - ) -> anyhow::Result<&mut UploadQueueInitialized> { - match self { - UploadQueue::Uninitialized => (), - UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { - anyhow::bail!("already initialized, state {}", self.as_str()) - } - } - - let mut files = HashMap::with_capacity(index_part.timeline_layers.len()); - for layer_name in &index_part.timeline_layers { - let layer_metadata = index_part - .layer_metadata - .get(layer_name) - .map(LayerFileMetadata::from) - .unwrap_or(LayerFileMetadata::MISSING); - files.insert(layer_name.to_owned(), layer_metadata); - } - - let index_part_metadata = index_part.parse_metadata()?; - info!( - "initializing upload queue with remote index_part.disk_consistent_lsn: {}", - index_part_metadata.disk_consistent_lsn() - ); - - let state = UploadQueueInitialized { - latest_files: files, - latest_files_changes_since_metadata_upload_scheduled: 0, - latest_metadata: index_part_metadata.clone(), - last_uploaded_consistent_lsn: index_part_metadata.disk_consistent_lsn(), - // what follows are boring default initializations - task_counter: 0, - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, - inprogress_tasks: HashMap::new(), - queued_operations: VecDeque::new(), - }; - - *self = UploadQueue::Initialized(state); - Ok(self.initialized_mut().expect("we just set it")) - } - - fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { - match self { - UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { - anyhow::bail!("queue is in state {}", self.as_str()) - } - UploadQueue::Initialized(x) => Ok(x), - } - } -} - -/// An in-progress upload or delete task. -#[derive(Debug)] -struct UploadTask { - /// Unique ID of this task. Used as the key in `inprogress_tasks` above. - task_id: u64, - retries: AtomicU32, - - op: UploadOp, -} - -#[derive(Debug)] -enum UploadOp { - /// Upload a layer file - UploadLayer(LayerFileName, LayerFileMetadata), - - /// Upload the metadata file - UploadMetadata(IndexPart, Lsn), - - /// Delete a file. - Delete(RemoteOpFileKind, LayerFileName), - - /// Barrier. When the barrier operation is reached, - Barrier(tokio::sync::watch::Sender<()>), -} - -impl std::fmt::Display for UploadOp { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - UploadOp::UploadLayer(path, metadata) => { - write!( - f, - "UploadLayer({}, size={:?})", - path.file_name(), - metadata.file_size() - ) - } - UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), - UploadOp::Delete(_, path) => write!(f, "Delete({})", path.file_name()), - UploadOp::Barrier(_) => write!(f, "Barrier"), - } - } -} - impl RemoteTimelineClient { + /// + /// Create a remote storage client for given timeline + /// + /// Note: the caller must initialize the upload queue before any uploads can be scheduled, + /// by calling init_upload_queue. + /// + pub fn new( + remote_storage: GenericRemoteStorage, + conf: &'static PageServerConf, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + Ok(RemoteTimelineClient { + conf, + runtime: &BACKGROUND_RUNTIME, + tenant_id, + timeline_id, + storage_impl: remote_storage, + upload_queue: Mutex::new(UploadQueue::Uninitialized), + metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)), + }) + } + /// Initialize the upload queue for a remote storage that already received /// an index file upload, i.e., it's not empty. /// The given `index_part` must be the one on the remote. @@ -1156,29 +980,6 @@ impl RemoteTimelineClient { } } -/// -/// Create a remote storage client for given timeline -/// -/// Note: the caller must initialize the upload queue before any uploads can be scheduled, -/// by calling init_upload_queue. -/// -pub fn create_remote_timeline_client( - remote_storage: GenericRemoteStorage, - conf: &'static PageServerConf, - tenant_id: TenantId, - timeline_id: TimelineId, -) -> anyhow::Result { - Ok(RemoteTimelineClient { - conf, - runtime: &BACKGROUND_RUNTIME, - tenant_id, - timeline_id, - storage_impl: remote_storage, - upload_queue: Mutex::new(UploadQueue::Uninitialized), - metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)), - }) -} - #[cfg(test)] mod tests { use super::*; diff --git a/pageserver/src/tenant/storage_sync/delete.rs b/pageserver/src/tenant/remote_timeline_client/delete.rs similarity index 100% rename from pageserver/src/tenant/storage_sync/delete.rs rename to pageserver/src/tenant/remote_timeline_client/delete.rs diff --git a/pageserver/src/tenant/storage_sync/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs similarity index 100% rename from pageserver/src/tenant/storage_sync/download.rs rename to pageserver/src/tenant/remote_timeline_client/download.rs diff --git a/pageserver/src/tenant/storage_sync/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs similarity index 100% rename from pageserver/src/tenant/storage_sync/index.rs rename to pageserver/src/tenant/remote_timeline_client/index.rs diff --git a/pageserver/src/tenant/storage_sync/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs similarity index 97% rename from pageserver/src/tenant/storage_sync/upload.rs rename to pageserver/src/tenant/remote_timeline_client/upload.rs index 08cea6268b..5082fa1634 100644 --- a/pageserver/src/tenant/storage_sync/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -5,7 +5,7 @@ use fail::fail_point; use std::path::Path; use tokio::fs; -use crate::{config::PageServerConf, tenant::storage_sync::index::IndexPart}; +use crate::{config::PageServerConf, tenant::remote_timeline_client::index::IndexPart}; use remote_storage::GenericRemoteStorage; use utils::id::{TenantId, TimelineId}; diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index c2c11d7bff..33474bb4a2 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -3,8 +3,8 @@ //! use crate::config::PageServerConf; use crate::repository::Key; +use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; -use crate::tenant::storage_sync::index::LayerFileMetadata; use anyhow::{bail, Result}; use std::ops::Range; use std::path::PathBuf; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index bbfcad5734..93eb643d12 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -23,11 +23,11 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; +use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::storage_layer::{ DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName, RemoteLayer, }; -use crate::tenant::storage_sync::{self, index::LayerFileMetadata}; use crate::tenant::{ ephemeral_file::is_ephemeral_file, layer_map::{LayerMap, SearchResult}, @@ -64,9 +64,9 @@ use crate::METADATA_FILE_NAME; use crate::ZERO_PAGE; use crate::{is_temporary, task_mgr}; +use super::remote_timeline_client::index::IndexPart; +use super::remote_timeline_client::RemoteTimelineClient; use super::storage_layer::{DeltaLayer, ImageLayer, Layer}; -use super::storage_sync::index::IndexPart; -use super::storage_sync::RemoteTimelineClient; #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum FlushLoopState { @@ -1122,7 +1122,7 @@ impl Timeline { num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these - } else if storage_sync::is_temp_download_file(&direntry_path) { + } else if remote_timeline_client::is_temp_download_file(&direntry_path) { info!( "skipping temp download file, reconcile_with_remote will resume / clean up: {}", fname diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs new file mode 100644 index 0000000000..790b2f59aa --- /dev/null +++ b/pageserver/src/tenant/upload_queue.rs @@ -0,0 +1,213 @@ +use crate::metrics::RemoteOpFileKind; + +use super::storage_layer::LayerFileName; +use crate::tenant::metadata::TimelineMetadata; +use crate::tenant::remote_timeline_client::index::IndexPart; +use crate::tenant::remote_timeline_client::index::LayerFileMetadata; +use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; + +use std::sync::Arc; +use tracing::info; + +use std::sync::atomic::AtomicU32; +use utils::lsn::Lsn; + +// clippy warns that Uninitialized is much smaller than Initialized, which wastes +// memory for Uninitialized variants. Doesn't matter in practice, there are not +// that many upload queues in a running pageserver, and most of them are initialized +// anyway. +#[allow(clippy::large_enum_variant)] +pub(crate) enum UploadQueue { + Uninitialized, + Initialized(UploadQueueInitialized), + Stopped(UploadQueueStopped), +} + +impl UploadQueue { + fn as_str(&self) -> &'static str { + match self { + UploadQueue::Uninitialized => "Uninitialized", + UploadQueue::Initialized(_) => "Initialized", + UploadQueue::Stopped(_) => "Stopped", + } + } +} + +/// This keeps track of queued and in-progress tasks. +pub(crate) struct UploadQueueInitialized { + /// Counter to assign task IDs + pub(crate) task_counter: u64, + + /// All layer files stored in the remote storage, taking into account all + /// in-progress and queued operations + pub(crate) latest_files: HashMap, + + /// How many file uploads or deletions been scheduled, since the + /// last (scheduling of) metadata index upload? + pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64, + + /// Metadata stored in the remote storage, taking into account all + /// in-progress and queued operations. + /// DANGER: do not return to outside world, e.g., safekeepers. + pub(crate) latest_metadata: TimelineMetadata, + + /// `disk_consistent_lsn` from the last metadata file that was successfully + /// uploaded. `Lsn(0)` if nothing was uploaded yet. + /// Unlike `latest_files` or `latest_metadata`, this value is never ahead. + /// Safekeeper can rely on it to make decisions for WAL storage. + pub(crate) last_uploaded_consistent_lsn: Lsn, + + // Breakdown of different kinds of tasks currently in-progress + pub(crate) num_inprogress_layer_uploads: usize, + pub(crate) num_inprogress_metadata_uploads: usize, + pub(crate) num_inprogress_deletions: usize, + + /// Tasks that are currently in-progress. In-progress means that a tokio Task + /// has been launched for it. An in-progress task can be busy uploading, but it can + /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can + /// be waiting for retry in `exponential_backoff`. + pub(crate) inprogress_tasks: HashMap>, + + /// Queued operations that have not been launched yet. They might depend on previous + /// tasks to finish. For example, metadata upload cannot be performed before all + /// preceding layer file uploads have completed. + pub(crate) queued_operations: VecDeque, +} + +pub(crate) struct UploadQueueStopped { + pub(crate) last_uploaded_consistent_lsn: Lsn, +} + +impl UploadQueue { + pub(crate) fn initialize_empty_remote( + &mut self, + metadata: &TimelineMetadata, + ) -> anyhow::Result<&mut UploadQueueInitialized> { + match self { + UploadQueue::Uninitialized => (), + UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { + anyhow::bail!("already initialized, state {}", self.as_str()) + } + } + + info!("initializing upload queue for empty remote"); + + let state = UploadQueueInitialized { + // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead. + latest_files: HashMap::new(), + latest_files_changes_since_metadata_upload_scheduled: 0, + latest_metadata: metadata.clone(), + // We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent + // safekeepers from garbage-collecting anything. + last_uploaded_consistent_lsn: Lsn(0), + // what follows are boring default initializations + task_counter: 0, + num_inprogress_layer_uploads: 0, + num_inprogress_metadata_uploads: 0, + num_inprogress_deletions: 0, + inprogress_tasks: HashMap::new(), + queued_operations: VecDeque::new(), + }; + + *self = UploadQueue::Initialized(state); + Ok(self.initialized_mut().expect("we just set it")) + } + + pub(crate) fn initialize_with_current_remote_index_part( + &mut self, + index_part: &IndexPart, + ) -> anyhow::Result<&mut UploadQueueInitialized> { + match self { + UploadQueue::Uninitialized => (), + UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { + anyhow::bail!("already initialized, state {}", self.as_str()) + } + } + + let mut files = HashMap::with_capacity(index_part.timeline_layers.len()); + for layer_name in &index_part.timeline_layers { + let layer_metadata = index_part + .layer_metadata + .get(layer_name) + .map(LayerFileMetadata::from) + .unwrap_or(LayerFileMetadata::MISSING); + files.insert(layer_name.to_owned(), layer_metadata); + } + + let index_part_metadata = index_part.parse_metadata()?; + info!( + "initializing upload queue with remote index_part.disk_consistent_lsn: {}", + index_part_metadata.disk_consistent_lsn() + ); + + let state = UploadQueueInitialized { + latest_files: files, + latest_files_changes_since_metadata_upload_scheduled: 0, + latest_metadata: index_part_metadata.clone(), + last_uploaded_consistent_lsn: index_part_metadata.disk_consistent_lsn(), + // what follows are boring default initializations + task_counter: 0, + num_inprogress_layer_uploads: 0, + num_inprogress_metadata_uploads: 0, + num_inprogress_deletions: 0, + inprogress_tasks: HashMap::new(), + queued_operations: VecDeque::new(), + }; + + *self = UploadQueue::Initialized(state); + Ok(self.initialized_mut().expect("we just set it")) + } + + pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { + match self { + UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { + anyhow::bail!("queue is in state {}", self.as_str()) + } + UploadQueue::Initialized(x) => Ok(x), + } + } +} + +/// An in-progress upload or delete task. +#[derive(Debug)] +pub(crate) struct UploadTask { + /// Unique ID of this task. Used as the key in `inprogress_tasks` above. + pub(crate) task_id: u64, + pub(crate) retries: AtomicU32, + + pub(crate) op: UploadOp, +} + +#[derive(Debug)] +pub(crate) enum UploadOp { + /// Upload a layer file + UploadLayer(LayerFileName, LayerFileMetadata), + + /// Upload the metadata file + UploadMetadata(IndexPart, Lsn), + + /// Delete a file. + Delete(RemoteOpFileKind, LayerFileName), + + /// Barrier. When the barrier operation is reached, + Barrier(tokio::sync::watch::Sender<()>), +} + +impl std::fmt::Display for UploadOp { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + UploadOp::UploadLayer(path, metadata) => { + write!( + f, + "UploadLayer({}, size={:?})", + path.file_name(), + metadata.file_size() + ) + } + UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), + UploadOp::Delete(_, path) => write!(f, "Delete({})", path.file_name()), + UploadOp::Barrier(_) => write!(f, "Barrier"), + } + } +}