From 58c8c1076ca10855f72d582bfe531e8adc67a636 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 10 Jan 2023 14:56:27 +0100 Subject: [PATCH] download_all_remote_layers API: require client to specify max_concurrent_downloads Before this patch, we would start all layer downloads simultaneously. There is at most one download_all_remote_layers task per timeline. Hence, the specified limit is per timeline. There is still no global concurrency limit for layer downloads. We'll have to revisit that at some point and also prioritize on-demand initiated downloads over download_all_remote_layers downloads. But that's for another day. --- libs/pageserver_api/src/models.rs | 7 +++++- pageserver/src/http/routes.rs | 6 +++-- pageserver/src/tenant/timeline.rs | 25 +++++++++++++------ test_runner/fixtures/neon_fixtures.py | 14 +++++++++-- test_runner/regress/test_ondemand_download.py | 15 +++++++++-- 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d954e5d21f..b5027cb331 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU64; +use std::num::{NonZeroU64, NonZeroUsize}; use byteorder::{BigEndian, ReadBytesExt}; use serde::{Deserialize, Serialize}; @@ -210,6 +210,11 @@ pub struct TimelineInfo { pub state: TimelineState, } +#[derive(Debug, Serialize, Deserialize)] +pub struct DownloadRemoteLayersTaskSpawnRequest { + pub max_concurrent_downloads: NonZeroUsize, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct DownloadRemoteLayersTaskInfo { pub task_id: String, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6d0db1b79f..1eb24c1507 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; +use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest; use remote_storage::GenericRemoteStorage; use tokio_util::sync::CancellationToken; use tracing::*; @@ -788,10 +789,11 @@ async fn timeline_checkpoint_handler(request: Request) -> Result, + mut request: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?; check_permission(&request, Some(tenant_id))?; let tenant = mgr::get_tenant(tenant_id, true) @@ -800,7 +802,7 @@ async fn timeline_download_remote_layers_handler_post( let timeline = tenant .get_timeline(timeline_id, true) .map_err(ApiError::NotFound)?; - match timeline.spawn_download_all_remote_layers().await { + match timeline.spawn_download_all_remote_layers(body).await { Ok(st) => json_response(StatusCode::ACCEPTED, st), Err(st) => json_response(StatusCode::CONFLICT, st), } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4c76ac614d..d59858f582 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3,12 +3,12 @@ use anyhow::{anyhow, bail, ensure, Context}; use bytes::Bytes; use fail::fail_point; -use futures::stream::FuturesUnordered; use futures::StreamExt; use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::{ - DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskState, TimelineState, + DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, + DownloadRemoteLayersTaskState, TimelineState, }; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; @@ -3116,6 +3116,7 @@ impl Timeline { pub async fn spawn_download_all_remote_layers( self: Arc, + request: DownloadRemoteLayersTaskSpawnRequest, ) -> Result { let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap(); if let Some(st) = &*status_guard { @@ -3139,7 +3140,7 @@ impl Timeline { "download all remote layers task", false, async move { - self_clone.download_all_remote_layers().await; + self_clone.download_all_remote_layers(request).await; let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap(); match &mut *status_guard { None => { @@ -3171,15 +3172,23 @@ impl Timeline { Ok(initial_info) } - async fn download_all_remote_layers(self: &Arc) { - let mut downloads: FuturesUnordered<_> = { + async fn download_all_remote_layers( + self: &Arc, + request: DownloadRemoteLayersTaskSpawnRequest, + ) { + let mut downloads = Vec::new(); + { let layers = self.layers.read().unwrap(); layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) .map(|l| self.download_remote_layer(l)) - .collect() - }; + .for_each(|dl| downloads.push(dl)) + } + let total_layer_count = downloads.len(); + // limit download concurrency as specified in request + let downloads = futures::stream::iter(downloads); + let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get()); macro_rules! lock_status { ($st:ident) => { @@ -3200,7 +3209,7 @@ impl Timeline { { lock_status!(st); - st.total_layer_count = downloads.len().try_into().unwrap(); + st.total_layer_count = total_layer_count as u64; } loop { tokio::select! { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5831222fda..d6c4c32b0b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1351,11 +1351,18 @@ class PageserverHttpClient(requests.Session): assert res_json is None def timeline_spawn_download_remote_layers( - self, tenant_id: TenantId, timeline_id: TimelineId + self, + tenant_id: TenantId, + timeline_id: TimelineId, + max_concurrent_downloads: int, ) -> dict[str, Any]: + body = { + "max_concurrent_downloads": max_concurrent_downloads, + } res = self.post( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers", + json=body, ) self.verbose_error(res) res_json = res.json() @@ -1389,10 +1396,13 @@ class PageserverHttpClient(requests.Session): self, tenant_id: TenantId, timeline_id: TimelineId, + max_concurrent_downloads: int, errors_ok=False, at_least_one_download=True, ): - res = self.timeline_spawn_download_remote_layers(tenant_id, timeline_id) + res = self.timeline_spawn_download_remote_layers( + tenant_id, timeline_id, max_concurrent_downloads + ) while True: completed = self.timeline_poll_download_remote_layers_status( tenant_id, timeline_id, res, poll_state="Completed" diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 184dc13888..3551f27cad 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -392,7 +392,12 @@ def test_download_remote_layers_api( # issue downloads that we know will fail info = client.timeline_download_remote_layers( - tenant_id, timeline_id, errors_ok=True, at_least_one_download=False + tenant_id, + timeline_id, + # allow some concurrency to unveil potential concurrency bugs + max_concurrent_downloads=10, + errors_ok=True, + at_least_one_download=False, ) log.info(f"info={info}") assert info["state"] == "Completed" @@ -413,7 +418,13 @@ def test_download_remote_layers_api( ##### Retry, this time without failpoints client.configure_failpoints(("remote-storage-download-pre-rename", "off")) - info = client.timeline_download_remote_layers(tenant_id, timeline_id, errors_ok=False) + info = client.timeline_download_remote_layers( + tenant_id, + timeline_id, + # allow some concurrency to unveil potential concurrency bugs + max_concurrent_downloads=10, + errors_ok=False, + ) log.info(f"info={info}") assert info["state"] == "Completed"