download_all_remote_layers API: require client to specify max_concurrent_downloads

Before this patch, we would start all layer downloads simultaneously.

There is at most one download_all_remote_layers task per timeline.
Hence, the specified limit is per timeline.

There is still no global concurrency limit for layer downloads.
We'll have to revisit that at some point and also prioritize on-demand
initiated downloads over download_all_remote_layers downloads.
But that's for another day.
This commit is contained in:
Christian Schwarz
2023-01-10 14:56:27 +01:00
committed by Christian Schwarz
parent 4c6b507472
commit 58c8c1076c
5 changed files with 52 additions and 15 deletions

View File

@@ -1,4 +1,4 @@
use std::num::NonZeroU64;
use std::num::{NonZeroU64, NonZeroUsize};
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
@@ -210,6 +210,11 @@ pub struct TimelineInfo {
pub state: TimelineState,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DownloadRemoteLayersTaskSpawnRequest {
pub max_concurrent_downloads: NonZeroUsize,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DownloadRemoteLayersTaskInfo {
pub task_id: String,

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -788,10 +789,11 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
}
async fn timeline_download_remote_layers_handler_post(
request: Request<Body>,
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
@@ -800,7 +802,7 @@ async fn timeline_download_remote_layers_handler_post(
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
match timeline.spawn_download_all_remote_layers().await {
match timeline.spawn_download_all_remote_layers(body).await {
Ok(st) => json_response(StatusCode::ACCEPTED, st),
Err(st) => json_response(StatusCode::CONFLICT, st),
}

View File

@@ -3,12 +3,12 @@
use anyhow::{anyhow, bail, ensure, Context};
use bytes::Bytes;
use fail::fail_point;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskState, TimelineState,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, TimelineState,
};
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
@@ -3116,6 +3116,7 @@ impl Timeline {
pub async fn spawn_download_all_remote_layers(
self: Arc<Self>,
request: DownloadRemoteLayersTaskSpawnRequest,
) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
if let Some(st) = &*status_guard {
@@ -3139,7 +3140,7 @@ impl Timeline {
"download all remote layers task",
false,
async move {
self_clone.download_all_remote_layers().await;
self_clone.download_all_remote_layers(request).await;
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
match &mut *status_guard {
None => {
@@ -3171,15 +3172,23 @@ impl Timeline {
Ok(initial_info)
}
async fn download_all_remote_layers(self: &Arc<Self>) {
let mut downloads: FuturesUnordered<_> = {
async fn download_all_remote_layers(
self: &Arc<Self>,
request: DownloadRemoteLayersTaskSpawnRequest,
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().unwrap();
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
.collect()
};
.for_each(|dl| downloads.push(dl))
}
let total_layer_count = downloads.len();
// limit download concurrency as specified in request
let downloads = futures::stream::iter(downloads);
let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get());
macro_rules! lock_status {
($st:ident) => {
@@ -3200,7 +3209,7 @@ impl Timeline {
{
lock_status!(st);
st.total_layer_count = downloads.len().try_into().unwrap();
st.total_layer_count = total_layer_count as u64;
}
loop {
tokio::select! {

View File

@@ -1351,11 +1351,18 @@ class PageserverHttpClient(requests.Session):
assert res_json is None
def timeline_spawn_download_remote_layers(
self, tenant_id: TenantId, timeline_id: TimelineId
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
) -> dict[str, Any]:
body = {
"max_concurrent_downloads": max_concurrent_downloads,
}
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
json=body,
)
self.verbose_error(res)
res_json = res.json()
@@ -1389,10 +1396,13 @@ class PageserverHttpClient(requests.Session):
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
errors_ok=False,
at_least_one_download=True,
):
res = self.timeline_spawn_download_remote_layers(tenant_id, timeline_id)
res = self.timeline_spawn_download_remote_layers(
tenant_id, timeline_id, max_concurrent_downloads
)
while True:
completed = self.timeline_poll_download_remote_layers_status(
tenant_id, timeline_id, res, poll_state="Completed"

View File

@@ -392,7 +392,12 @@ def test_download_remote_layers_api(
# issue downloads that we know will fail
info = client.timeline_download_remote_layers(
tenant_id, timeline_id, errors_ok=True, at_least_one_download=False
tenant_id,
timeline_id,
# allow some concurrency to unveil potential concurrency bugs
max_concurrent_downloads=10,
errors_ok=True,
at_least_one_download=False,
)
log.info(f"info={info}")
assert info["state"] == "Completed"
@@ -413,7 +418,13 @@ def test_download_remote_layers_api(
##### Retry, this time without failpoints
client.configure_failpoints(("remote-storage-download-pre-rename", "off"))
info = client.timeline_download_remote_layers(tenant_id, timeline_id, errors_ok=False)
info = client.timeline_download_remote_layers(
tenant_id,
timeline_id,
# allow some concurrency to unveil potential concurrency bugs
max_concurrent_downloads=10,
errors_ok=False,
)
log.info(f"info={info}")
assert info["state"] == "Completed"