diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs
index d954e5d21f..b5027cb331 100644
--- a/libs/pageserver_api/src/models.rs
+++ b/libs/pageserver_api/src/models.rs
@@ -1,4 +1,4 @@
-use std::num::NonZeroU64;
+use std::num::{NonZeroU64, NonZeroUsize};
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
@@ -210,6 +210,11 @@ pub struct TimelineInfo {
pub state: TimelineState,
}
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DownloadRemoteLayersTaskSpawnRequest {
+ pub max_concurrent_downloads: NonZeroUsize,
+}
+
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DownloadRemoteLayersTaskInfo {
pub task_id: String,
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 6d0db1b79f..1eb24c1507 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
+use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -788,10 +789,11 @@ async fn timeline_checkpoint_handler(request: Request
) -> Result,
+ mut request: Request,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
+ let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
@@ -800,7 +802,7 @@ async fn timeline_download_remote_layers_handler_post(
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
- match timeline.spawn_download_all_remote_layers().await {
+ match timeline.spawn_download_all_remote_layers(body).await {
Ok(st) => json_response(StatusCode::ACCEPTED, st),
Err(st) => json_response(StatusCode::CONFLICT, st),
}
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index 4c76ac614d..d59858f582 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -3,12 +3,12 @@
use anyhow::{anyhow, bail, ensure, Context};
use bytes::Bytes;
use fail::fail_point;
-use futures::stream::FuturesUnordered;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
- DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskState, TimelineState,
+ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
+ DownloadRemoteLayersTaskState, TimelineState,
};
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
@@ -3116,6 +3116,7 @@ impl Timeline {
pub async fn spawn_download_all_remote_layers(
self: Arc,
+ request: DownloadRemoteLayersTaskSpawnRequest,
) -> Result {
let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
if let Some(st) = &*status_guard {
@@ -3139,7 +3140,7 @@ impl Timeline {
"download all remote layers task",
false,
async move {
- self_clone.download_all_remote_layers().await;
+ self_clone.download_all_remote_layers(request).await;
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
match &mut *status_guard {
None => {
@@ -3171,15 +3172,23 @@ impl Timeline {
Ok(initial_info)
}
- async fn download_all_remote_layers(self: &Arc) {
- let mut downloads: FuturesUnordered<_> = {
+ async fn download_all_remote_layers(
+ self: &Arc,
+ request: DownloadRemoteLayersTaskSpawnRequest,
+ ) {
+ let mut downloads = Vec::new();
+ {
let layers = self.layers.read().unwrap();
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
- .collect()
- };
+ .for_each(|dl| downloads.push(dl))
+ }
+ let total_layer_count = downloads.len();
+ // limit download concurrency as specified in request
+ let downloads = futures::stream::iter(downloads);
+ let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get());
macro_rules! lock_status {
($st:ident) => {
@@ -3200,7 +3209,7 @@ impl Timeline {
{
lock_status!(st);
- st.total_layer_count = downloads.len().try_into().unwrap();
+ st.total_layer_count = total_layer_count as u64;
}
loop {
tokio::select! {
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index 5831222fda..d6c4c32b0b 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -1351,11 +1351,18 @@ class PageserverHttpClient(requests.Session):
assert res_json is None
def timeline_spawn_download_remote_layers(
- self, tenant_id: TenantId, timeline_id: TimelineId
+ self,
+ tenant_id: TenantId,
+ timeline_id: TimelineId,
+ max_concurrent_downloads: int,
) -> dict[str, Any]:
+ body = {
+ "max_concurrent_downloads": max_concurrent_downloads,
+ }
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
+ json=body,
)
self.verbose_error(res)
res_json = res.json()
@@ -1389,10 +1396,13 @@ class PageserverHttpClient(requests.Session):
self,
tenant_id: TenantId,
timeline_id: TimelineId,
+ max_concurrent_downloads: int,
errors_ok=False,
at_least_one_download=True,
):
- res = self.timeline_spawn_download_remote_layers(tenant_id, timeline_id)
+ res = self.timeline_spawn_download_remote_layers(
+ tenant_id, timeline_id, max_concurrent_downloads
+ )
while True:
completed = self.timeline_poll_download_remote_layers_status(
tenant_id, timeline_id, res, poll_state="Completed"
diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py
index 184dc13888..3551f27cad 100644
--- a/test_runner/regress/test_ondemand_download.py
+++ b/test_runner/regress/test_ondemand_download.py
@@ -392,7 +392,12 @@ def test_download_remote_layers_api(
# issue downloads that we know will fail
info = client.timeline_download_remote_layers(
- tenant_id, timeline_id, errors_ok=True, at_least_one_download=False
+ tenant_id,
+ timeline_id,
+ # allow some concurrency to unveil potential concurrency bugs
+ max_concurrent_downloads=10,
+ errors_ok=True,
+ at_least_one_download=False,
)
log.info(f"info={info}")
assert info["state"] == "Completed"
@@ -413,7 +418,13 @@ def test_download_remote_layers_api(
##### Retry, this time without failpoints
client.configure_failpoints(("remote-storage-download-pre-rename", "off"))
- info = client.timeline_download_remote_layers(tenant_id, timeline_id, errors_ok=False)
+ info = client.timeline_download_remote_layers(
+ tenant_id,
+ timeline_id,
+ # allow some concurrency to unveil potential concurrency bugs
+ max_concurrent_downloads=10,
+ errors_ok=False,
+ )
log.info(f"info={info}")
assert info["state"] == "Completed"