diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index 49b1d9dc87..dae141bf77 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -43,6 +43,17 @@ impl RemoteStorageKind { } } +impl RemoteStorageConfig { + /// Helper to fetch the configured concurrency limit. + pub fn concurrency_limit(&self) -> Option { + match &self.storage { + RemoteStorageKind::LocalFs { .. } => None, + RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()), + RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()), + } + } +} + fn default_timeout() -> Duration { RemoteStorageConfig::DEFAULT_TIMEOUT } diff --git a/pageserver/benches/upload_queue.rs b/pageserver/benches/upload_queue.rs index 528b3d5490..ed644b0e3c 100644 --- a/pageserver/benches/upload_queue.rs +++ b/pageserver/benches/upload_queue.rs @@ -53,7 +53,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) { // Construct the queue. let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&index)?; + let queue = queue.initialize_with_current_remote_index_part(&index, 0)?; // Populate inprogress_tasks with a bunch of layer1 deletions. let delete = UploadOp::Delete(Delete { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 75e8da496d..1602765585 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -425,8 +425,16 @@ impl RemoteTimelineClient { /// an index file upload, i.e., it's not empty. /// The given `index_part` must be the one on the remote. pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> { + // Set the maximum number of inprogress tasks to the remote storage concurrency. There's + // certainly no point in starting more upload tasks than this. + let inprogress_limit = self + .conf + .remote_storage_config + .as_ref() + .and_then(|r| r.concurrency_limit()) + .unwrap_or(0); let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; + upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?; self.update_remote_physical_size_gauge(Some(index_part)); info!( "initialized upload queue from remote index with {} layer files", @@ -441,8 +449,16 @@ impl RemoteTimelineClient { &self, local_metadata: &TimelineMetadata, ) -> anyhow::Result<()> { + // Set the maximum number of inprogress tasks to the remote storage concurrency. There's + // certainly no point in starting more upload tasks than this. + let inprogress_limit = self + .conf + .remote_storage_config + .as_ref() + .and_then(|r| r.concurrency_limit()) + .unwrap_or(0); let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_empty_remote(local_metadata)?; + upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?; self.update_remote_physical_size_gauge(None); info!("initialized upload queue as empty"); Ok(()) @@ -458,9 +474,15 @@ impl RemoteTimelineClient { let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!( "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted" ))?; + let inprogress_limit = self + .conf + .remote_storage_config + .as_ref() + .and_then(|r| r.concurrency_limit()) + .unwrap_or(0); let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; + upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?; self.update_remote_physical_size_gauge(Some(index_part)); self.stop_impl(&mut upload_queue); @@ -2355,6 +2377,7 @@ impl RemoteTimelineClient { // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point. // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it. let upload_queue_for_deletion = UploadQueueInitialized { + inprogress_limit: initialized.inprogress_limit, task_counter: 0, dirty: initialized.dirty.clone(), clean: initialized.clean.clone(), diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index bd524e8153..09c8f6ad8c 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -51,6 +51,9 @@ pub enum OpType { /// This keeps track of queued and in-progress tasks. pub struct UploadQueueInitialized { + /// Maximum number of inprogress tasks to schedule. 0 is no limit. + pub(crate) inprogress_limit: usize, + /// Counter to assign task IDs pub(crate) task_counter: u64, @@ -128,8 +131,14 @@ impl UploadQueueInitialized { /// the queue if it doesn't conflict with operations ahead of it. /// /// None may be returned even if the queue isn't empty, if no operations are ready yet. + /// + /// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit. pub fn next_ready(&mut self) -> Option { - // NB: this is quadratic, but queues are expected to be small. + // If inprogress_tasks is already at limit, don't schedule anything more. + if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit { + return None; + } + for (i, candidate) in self.queued_operations.iter().enumerate() { // If this candidate is ready, go for it. Otherwise, try the next one. if self.is_ready(i) { @@ -289,6 +298,7 @@ impl UploadQueue { pub fn initialize_empty_remote( &mut self, metadata: &TimelineMetadata, + inprogress_limit: usize, ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), @@ -302,6 +312,7 @@ impl UploadQueue { let index_part = IndexPart::empty(metadata.clone()); let state = UploadQueueInitialized { + inprogress_limit, dirty: index_part.clone(), clean: (index_part, None), latest_files_changes_since_metadata_upload_scheduled: 0, @@ -325,6 +336,7 @@ impl UploadQueue { pub fn initialize_with_current_remote_index_part( &mut self, index_part: &IndexPart, + inprogress_limit: usize, ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), @@ -339,6 +351,7 @@ impl UploadQueue { ); let state = UploadQueueInitialized { + inprogress_limit, dirty: index_part.clone(), clean: (index_part.clone(), None), latest_files_changes_since_metadata_upload_scheduled: 0, @@ -633,7 +646,7 @@ mod tests { #[test] fn schedule_barrier() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?; let tli = make_timeline(); let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter @@ -700,7 +713,7 @@ mod tests { #[test] fn schedule_delete_parallel() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?; let tli = make_timeline(); // Enqueue a bunch of deletes, some with conflicting names. @@ -745,7 +758,7 @@ mod tests { #[test] fn schedule_upload_conflicts() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue three versions of the same layer, with different file sizes. @@ -778,7 +791,7 @@ mod tests { #[test] fn schedule_upload_delete_conflicts() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue two layer uploads, with a delete of both layers in between them. These should be @@ -817,7 +830,7 @@ mod tests { #[test] fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue two layer uploads, with a delete of both layers in between them. These should be @@ -859,7 +872,7 @@ mod tests { #[test] fn schedule_upload_parallel() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue three different layer uploads. @@ -888,7 +901,7 @@ mod tests { #[test] fn schedule_index_serial() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; // Enqueue three uploads of the current empty index. let index = Box::new(queue.clean.0.clone()); @@ -925,7 +938,7 @@ mod tests { #[test] fn schedule_index_upload_chain() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue three uploads of the current empty index. @@ -994,7 +1007,7 @@ mod tests { #[test] fn schedule_index_delete_dereferenced() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Create a layer to upload. @@ -1038,7 +1051,7 @@ mod tests { #[test] fn schedule_index_upload_dereferenced() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Create a layer to upload. @@ -1085,7 +1098,7 @@ mod tests { #[test] fn schedule_shutdown() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?; let tli = make_timeline(); let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter @@ -1139,6 +1152,42 @@ mod tests { Ok(()) } + /// Scheduling respects inprogress_limit. + #[test] + fn schedule_inprogress_limit() -> anyhow::Result<()> { + // Create a queue with inprogress_limit=2. + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?; + let tli = make_timeline(); + + // Enqueue a bunch of uploads. + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Schedule all ready operations. Only 2 are scheduled. + let tasks = queue.schedule_ready(); + assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]); + assert!(queue.next_ready().is_none()); + + // When one completes, another is scheduled. + queue.complete(tasks[0].task_id); + let tasks = queue.schedule_ready(); + assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]); + + Ok(()) + } + /// Tests that can_bypass takes name, generation and shard index into account for all operations. #[test] fn can_bypass_path() -> anyhow::Result<()> {