pageserver, controller: improve secondary download APIs for large shards (#7131)

## Problem

The existing secondary download API relied on the caller to wait as long
as it took to complete -- for large shards that could be a long time, so
typical clients that might have a baked-in ~30s timeout would have a
problem.

## Summary of changes

- Take a `wait_ms` query parameter to instruct the pageserver how long
to wait: if the download isn't complete in this duration, then 201 is
returned instead of 200.
- For both 200 and 201 responses, include response body describing
download progress, in terms of layers and bytes. This is sufficient for
the caller to track how much data is being transferred and log/present
that status.
- In storage controller live migrations, use this API to apply a much
longer outer timeout, with smaller individual per-request timeouts, and
log the progress of the downloads.
- Add a test that injects layer download delays to exercise the new
behavior
This commit is contained in:
John Spray
2024-03-15 19:45:58 +00:00
committed by GitHub
parent ad6f538aef
commit 9752ad8489
18 changed files with 647 additions and 110 deletions

View File

@@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken;
use utils::auth::{Scope, SwappableJwtAuth};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
use utils::http::request::{must_get_query_param, parse_request_param};
use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param};
use utils::id::{TenantId, TimelineId};
use utils::{
@@ -248,8 +248,10 @@ async fn handle_tenant_secondary_download(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
service.tenant_secondary_download(tenant_id).await?;
json_response(StatusCode::OK, ())
let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
json_response(status, progress)
}
async fn handle_tenant_delete(

View File

@@ -8,7 +8,7 @@ use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_client::mgmt_api;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
@@ -258,22 +258,81 @@ impl Reconciler {
tenant_shard_id: TenantShardId,
node: &Node,
) -> Result<(), ReconcileError> {
match node
.with_client_retries(
|client| async move { client.tenant_secondary_download(tenant_shard_id).await },
&self.service_config.jwt_token,
1,
1,
Duration::from_secs(60),
&self.cancel,
)
.await
{
None => Err(ReconcileError::Cancel),
Some(Ok(_)) => Ok(()),
Some(Err(e)) => {
tracing::info!(" (skipping destination download: {})", e);
Ok(())
// This is not the timeout for a request, but the total amount of time we're willing to wait
// for a secondary location to get up to date before
const TOTAL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(300);
// This the long-polling interval for the secondary download requests we send to destination pageserver
// during a migration.
const REQUEST_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
let started_at = Instant::now();
loop {
let (status, progress) = match node
.with_client_retries(
|client| async move {
client
.tenant_secondary_download(
tenant_shard_id,
Some(REQUEST_DOWNLOAD_TIMEOUT),
)
.await
},
&self.service_config.jwt_token,
1,
3,
REQUEST_DOWNLOAD_TIMEOUT * 2,
&self.cancel,
)
.await
{
None => Err(ReconcileError::Cancel),
Some(Ok(v)) => Ok(v),
Some(Err(e)) => {
// Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
// attaching, but we should not let an issue with a secondary location stop us proceeding
// with a live migration.
tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
return Ok(());
}
}?;
if status == StatusCode::OK {
tracing::info!(
"Downloads to {} complete: {}/{} layers, {}/{} bytes",
node,
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
return Ok(());
} else if status == StatusCode::ACCEPTED {
let total_runtime = started_at.elapsed();
if total_runtime > TOTAL_DOWNLOAD_TIMEOUT {
tracing::warn!("Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes",
total_runtime.as_millis(),
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
// Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
// it just makes the I/O performance for users less good.
return Ok(());
}
// Log and proceed around the loop to retry. We don't sleep between requests, because our HTTP call
// to the pageserver is a long-poll.
tracing::info!(
"Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
node,
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
}
}
}

View File

@@ -16,7 +16,15 @@ use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use pageserver_api::{
controller_api::UtilizationScore,
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest},
};
use pageserver_api::{
models::{
self, LocationConfig, LocationConfigListResponse, LocationConfigMode,
PageserverUtilization, ShardParameters, TenantConfig, TenantCreateRequest,
@@ -30,14 +38,6 @@ use pageserver_api::{
ValidateResponse, ValidateResponseTenant,
},
};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::TenantConfigRequest,
};
use pageserver_client::mgmt_api;
use tokio::sync::OwnedRwLockWriteGuard;
use tokio_util::sync::CancellationToken;
@@ -2084,7 +2084,8 @@ impl Service {
pub(crate) async fn tenant_secondary_download(
&self,
tenant_id: TenantId,
) -> Result<(), ApiError> {
wait: Option<Duration>,
) -> Result<(StatusCode, SecondaryProgress), ApiError> {
let _tenant_lock = self.tenant_op_locks.shared(tenant_id).await;
// Acquire lock and yield the collection of shard-node tuples which we will send requests onward to
@@ -2107,32 +2108,71 @@ impl Service {
targets
};
// TODO: this API, and the underlying pageserver API, should take a timeout argument so that for long running
// downloads, they can return a clean 202 response instead of the HTTP client timing out.
// Issue concurrent requests to all shards' locations
let mut futs = FuturesUnordered::new();
for (tenant_shard_id, node) in targets {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
futs.push(async move {
let result = client.tenant_secondary_download(tenant_shard_id).await;
(result, node)
let result = client
.tenant_secondary_download(tenant_shard_id, wait)
.await;
(result, node, tenant_shard_id)
})
}
// Handle any errors returned by pageservers. This includes cases like this request racing with
// a scheduling operation, such that the tenant shard we're calling doesn't exist on that pageserver any more, as
// well as more general cases like 503s, 500s, or timeouts.
while let Some((result, node)) = futs.next().await {
let Err(e) = result else { continue };
// Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever
// is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache
// than they had hoped for.
tracing::warn!("Ignoring tenant secondary download error from pageserver {node}: {e}",);
let mut aggregate_progress = SecondaryProgress::default();
let mut aggregate_status: Option<StatusCode> = None;
let mut error: Option<mgmt_api::Error> = None;
while let Some((result, node, tenant_shard_id)) = futs.next().await {
match result {
Err(e) => {
// Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever
// is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache
// than they had hoped for.
tracing::warn!("Secondary download error from pageserver {node}: {e}",);
error = Some(e)
}
Ok((status_code, progress)) => {
tracing::info!(%tenant_shard_id, "Shard status={status_code} progress: {progress:?}");
aggregate_progress.layers_downloaded += progress.layers_downloaded;
aggregate_progress.layers_total += progress.layers_total;
aggregate_progress.bytes_downloaded += progress.bytes_downloaded;
aggregate_progress.bytes_total += progress.bytes_total;
aggregate_progress.heatmap_mtime =
std::cmp::max(aggregate_progress.heatmap_mtime, progress.heatmap_mtime);
aggregate_status = match aggregate_status {
None => Some(status_code),
Some(StatusCode::OK) => Some(status_code),
Some(cur) => {
// Other status codes (e.g. 202) -- do not overwrite.
Some(cur)
}
};
}
}
}
Ok(())
// If any of the shards return 202, indicate our result as 202.
match aggregate_status {
None => {
match error {
Some(e) => {
// No successes, and an error: surface it
Err(ApiError::Conflict(format!("Error from pageserver: {e}")))
}
None => {
// No shards found
Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
))
}
}
}
Some(aggregate_status) => Ok((aggregate_status, aggregate_progress)),
}
}
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {

View File

@@ -568,13 +568,6 @@ impl PageServerNode {
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
}
pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> {
Ok(self
.http_client
.tenant_secondary_download(*tenant_id)
.await?)
}
pub async fn timeline_create(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -751,6 +751,52 @@ pub struct WalRedoManagerStatus {
pub pid: Option<u32>,
}
/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
/// a download job, timing out while waiting for it to run, and then inspecting this status to understand
/// what's happening.
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub struct SecondaryProgress {
/// The remote storage LastModified time of the heatmap object we last downloaded.
#[serde(
serialize_with = "opt_ser_rfc3339_millis",
deserialize_with = "opt_deser_rfc3339_millis"
)]
pub heatmap_mtime: Option<SystemTime>,
/// The number of layers currently on-disk
pub layers_downloaded: usize,
/// The number of layers in the most recently seen heatmap
pub layers_total: usize,
/// The number of layer bytes currently on-disk
pub bytes_downloaded: u64,
/// The number of layer bytes in the most recently seen heatmap
pub bytes_total: u64,
}
fn opt_ser_rfc3339_millis<S: serde::Serializer>(
ts: &Option<SystemTime>,
serializer: S,
) -> Result<S::Ok, S::Error> {
match ts {
Some(ts) => serializer.collect_str(&humantime::format_rfc3339_millis(*ts)),
None => serializer.serialize_none(),
}
}
fn opt_deser_rfc3339_millis<'de, D>(deserializer: D) -> Result<Option<SystemTime>, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let s: Option<String> = serde::de::Deserialize::deserialize(deserializer)?;
match s {
None => Ok(None),
Some(s) => humantime::parse_rfc3339(&s)
.map_err(serde::de::Error::custom)
.map(Some),
}
}
pub mod virtual_file {
#[derive(
Copy,

View File

@@ -157,9 +157,8 @@ impl AzureBlobStorage {
let mut bufs = Vec::new();
while let Some(part) = response.next().await {
let part = part?;
let etag_str: &str = part.blob.properties.etag.as_ref();
if etag.is_none() {
etag = Some(etag.unwrap_or_else(|| etag_str.to_owned()));
etag = Some(part.blob.properties.etag);
}
if last_modified.is_none() {
last_modified = Some(part.blob.properties.last_modified.into());
@@ -180,6 +179,7 @@ impl AzureBlobStorage {
"Azure GET response contained no buffers"
)));
}
// unwrap safety: if these were None, bufs would be empty and we would have returned an error already
let etag = etag.unwrap();
let last_modified = last_modified.unwrap();

View File

@@ -42,6 +42,9 @@ pub use self::{
};
use s3_bucket::RequestKind;
/// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
pub use azure_core::Etag;
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
@@ -293,7 +296,7 @@ pub struct Download {
/// The last time the file was modified (`last-modified` HTTP header)
pub last_modified: SystemTime,
/// A way to identify this specific version of the resource (`etag` HTTP header)
pub etag: String,
pub etag: Etag,
/// Extra key-value data, associated with the current remote file.
pub metadata: Option<StorageMetadata>,
}

View File

@@ -30,6 +30,7 @@ use crate::{
};
use super::{RemoteStorage, StorageMetadata};
use crate::Etag;
const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
@@ -626,9 +627,9 @@ async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, Downlo
// Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we
// read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
// quickly, with less overhead than using a mock S3 server.
fn mock_etag(meta: &std::fs::Metadata) -> String {
fn mock_etag(meta: &std::fs::Metadata) -> Etag {
let mtime = meta.modified().expect("Filesystem mtime missing");
format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis())
format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
}
#[cfg(test)]

View File

@@ -289,7 +289,8 @@ impl S3Bucket {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
let etag = object_output
.e_tag
.ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?;
.ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
.into();
let last_modified = object_output
.last_modified
.ok_or(DownloadError::Other(anyhow::anyhow!(

View File

@@ -249,13 +249,26 @@ impl Client {
Ok(())
}
pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> {
let uri = format!(
pub async fn tenant_secondary_download(
&self,
tenant_id: TenantShardId,
wait: Option<std::time::Duration>,
) -> Result<(StatusCode, SecondaryProgress)> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/secondary/download",
self.mgmt_api_endpoint, tenant_id
);
self.request(Method::POST, &uri, ()).await?;
Ok(())
))
.expect("Cannot build URL");
if let Some(wait) = wait {
path.query_pairs_mut()
.append_pair("wait_ms", &format!("{}", wait.as_millis()));
}
let response = self.request(Method::POST, path, ()).await?;
let status = response.status();
let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
Ok((status, progress))
}
pub async fn location_config(

View File

@@ -965,12 +965,28 @@ paths:
required: true
schema:
type: string
- name: wait_ms
description: If set, we will wait this long for download to complete, and if it isn't complete then return 202
in: query
required: false
schema:
type: integer
post:
description: |
If the location is in secondary mode, download latest heatmap and layers
responses:
"200":
description: Success
content:
application/json:
schema:
$ref: "#/components/schemas/SecondaryProgress"
"202":
description: Download has started but not yet finished
content:
application/json:
schema:
$ref: "#/components/schemas/SecondaryProgress"
"500":
description: Generic operation error
content:
@@ -1623,6 +1639,37 @@ components:
Lower is better score for how good this pageserver would be for the next tenant.
The default or maximum value can be returned in situations when a proper score cannot (yet) be calculated.
SecondaryProgress:
type: object
required:
- heatmap_mtime
- layers_downloaded
- layers_total
- bytes_downloaded
- bytes_total
properties:
heatmap_mtime:
type: string
format: date-time
description: Modification time of the most recently downloaded layer heatmap (RFC 3339 format)
layers_downloaded:
type: integer
format: int64
description: How many layers from the latest layer heatmap are present on disk
bytes_downloaded:
type: integer
format: int64
description: How many bytes of layer content from the latest layer heatmap are present on disk
layers_total:
type: integer
format: int64
description: How many layers were in the latest layer heatmap
bytes_total:
type: integer
format: int64
description: How many bytes of layer content were in the latest layer heatmap
Error:
type: object
required:

View File

@@ -1987,13 +1987,42 @@ async fn secondary_download_handler(
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
state
.secondary_controller
.download_tenant(tenant_shard_id)
.await
.map_err(ApiError::InternalServerError)?;
let wait = parse_query_param(&request, "wait_ms")?.map(Duration::from_millis);
json_response(StatusCode::OK, ())
// We don't need this to issue the download request, but:
// - it enables us to cleanly return 404 if we get a request for an absent shard
// - we will use this to provide status feedback in the response
let Some(secondary_tenant) = state
.tenant_manager
.get_secondary_tenant_shard(tenant_shard_id)
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Shard {} not found", tenant_shard_id).into(),
));
};
let timeout = wait.unwrap_or(Duration::MAX);
let status = match tokio::time::timeout(
timeout,
state.secondary_controller.download_tenant(tenant_shard_id),
)
.await
{
// Download job ran to completion.
Ok(Ok(())) => StatusCode::OK,
// Edge case: downloads aren't usually fallible: things like a missing heatmap are considered
// okay. We could get an error here in the unlikely edge case that the tenant
// was detached between our check above and executing the download job.
Ok(Err(e)) => return Err(ApiError::InternalServerError(e)),
// A timeout is not an error: we have started the download, we're just not done
// yet. The caller will get a response body indicating status.
Err(_) => StatusCode::ACCEPTED,
};
let progress = secondary_tenant.progress.lock().unwrap().clone();
json_response(status, progress)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -95,7 +95,11 @@ pub(crate) struct SecondaryTenant {
shard_identity: ShardIdentity,
tenant_conf: std::sync::Mutex<TenantConfOpt>,
// Internal state used by the Downloader.
detail: std::sync::Mutex<SecondaryDetail>,
// Public state indicating overall progress of downloads relative to the last heatmap seen
pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
}
impl SecondaryTenant {
@@ -118,6 +122,8 @@ impl SecondaryTenant {
tenant_conf: std::sync::Mutex::new(tenant_conf),
detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
progress: std::sync::Mutex::default(),
})
}
@@ -247,9 +253,12 @@ impl SecondaryTenant {
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
/// and heatmap uploads. This is not a hot data path: it's used for:
/// - Live migrations, where we want to ensure a migration destination has the freshest possible
/// content before trying to cut over.
/// - Tests, where we want to immediately upload/download for a particular tenant.
///
/// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,

View File

@@ -41,14 +41,16 @@ use crate::tenant::{
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::Future;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use rand::Rng;
use remote_storage::{DownloadError, GenericRemoteStorage};
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
use utils::{
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
id::TimelineId,
};
use super::{
@@ -128,6 +130,7 @@ pub(super) struct SecondaryDetail {
pub(super) config: SecondaryLocationConfig,
last_download: Option<Instant>,
last_etag: Option<Etag>,
next_download: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
@@ -138,11 +141,26 @@ fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
datetime.format("%d/%m/%Y %T")
}
/// Information returned from download function when it detects the heatmap has changed
struct HeatMapModified {
etag: Etag,
last_modified: SystemTime,
bytes: Vec<u8>,
}
enum HeatMapDownload {
// The heatmap's etag has changed: return the new etag, mtime and the body bytes
Modified(HeatMapModified),
// The heatmap's etag is unchanged
Unmodified,
}
impl SecondaryDetail {
pub(super) fn new(config: SecondaryLocationConfig) -> Self {
Self {
config,
last_download: None,
last_etag: None,
next_download: None,
timelines: HashMap::new(),
}
@@ -477,11 +495,31 @@ impl<'a> TenantDownloader<'a> {
};
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
// We will use the etag from last successful download to make the download conditional on changes
let last_etag = self
.secondary_state
.detail
.lock()
.unwrap()
.last_etag
.clone();
// Download the tenant's heatmap
let heatmap_bytes = tokio::select!(
bytes = self.download_heatmap() => {bytes?},
let HeatMapModified {
last_modified: heatmap_mtime,
etag: heatmap_etag,
bytes: heatmap_bytes,
} = match tokio::select!(
bytes = self.download_heatmap(last_etag.as_ref()) => {bytes?},
_ = self.secondary_state.cancel.cancelled() => return Ok(())
);
) {
HeatMapDownload::Unmodified => {
tracing::info!("Heatmap unchanged since last successful download");
return Ok(());
}
HeatMapDownload::Modified(m) => m,
};
let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
@@ -498,6 +536,14 @@ impl<'a> TenantDownloader<'a> {
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
// Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
// principle that deletions should be done before writes wherever possible, and so that we can use this
// phase to initialize our SecondaryProgress.
{
*self.secondary_state.progress.lock().unwrap() =
self.prepare_timelines(&heatmap, heatmap_mtime).await?;
}
// Download the layers in the heatmap
for timeline in heatmap.timelines {
if self.secondary_state.cancel.is_cancelled() {
@@ -515,30 +561,159 @@ impl<'a> TenantDownloader<'a> {
.await?;
}
// Only update last_etag after a full successful download: this way will not skip
// the next download, even if the heatmap's actual etag is unchanged.
self.secondary_state.detail.lock().unwrap().last_etag = Some(heatmap_etag);
Ok(())
}
async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
/// Do any fast local cleanup that comes before the much slower process of downloading
/// layers from remote storage. In the process, initialize the SecondaryProgress object
/// that will later be updated incrementally as we download layers.
async fn prepare_timelines(
&self,
heatmap: &HeatMapTenant,
heatmap_mtime: SystemTime,
) -> Result<SecondaryProgress, UpdateError> {
let heatmap_stats = heatmap.get_stats();
// We will construct a progress object, and then populate its initial "downloaded" numbers
// while iterating through local layer state in [`Self::prepare_timelines`]
let mut progress = SecondaryProgress {
layers_total: heatmap_stats.layers,
bytes_total: heatmap_stats.bytes,
heatmap_mtime: Some(heatmap_mtime),
layers_downloaded: 0,
bytes_downloaded: 0,
};
// Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
let mut delete_layers = Vec::new();
let mut delete_timelines = Vec::new();
{
let mut detail = self.secondary_state.detail.lock().unwrap();
for (timeline_id, timeline_state) in &mut detail.timelines {
let Some(heatmap_timeline_index) = heatmap
.timelines
.iter()
.position(|t| t.timeline_id == *timeline_id)
else {
// This timeline is no longer referenced in the heatmap: delete it locally
delete_timelines.push(*timeline_id);
continue;
};
let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
let layers_in_heatmap = heatmap_timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
let mut layer_count = layers_on_disk.len();
let mut layer_byte_count: u64 = timeline_state
.on_disk_layers
.values()
.map(|l| l.metadata.file_size())
.sum();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
layer_count -= 1;
layer_byte_count -= timeline_state
.on_disk_layers
.get(layer)
.unwrap()
.metadata
.file_size();
delete_layers.push((*timeline_id, (*layer).clone()));
}
progress.bytes_downloaded += layer_byte_count;
progress.layers_downloaded += layer_count;
}
}
// Execute accumulated deletions
for (timeline_id, layer_name) in delete_layers {
let timeline_path = self
.conf
.timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
let local_path = timeline_path.join(layer_name.to_string());
tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)
.maybe_fatal_err("Removing secondary layer")?;
// Update in-memory housekeeping to reflect the absence of the deleted layer
let mut detail = self.secondary_state.detail.lock().unwrap();
let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
continue;
};
timeline_state.on_disk_layers.remove(&layer_name);
}
for timeline_id in delete_timelines {
let timeline_path = self
.conf
.timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
tracing::info!(timeline_id=%timeline_id,
"Timeline no longer in heatmap, removing from secondary location"
);
tokio::fs::remove_dir_all(&timeline_path)
.await
.or_else(fs_ext::ignore_not_found)
.maybe_fatal_err("Removing secondary timeline")?;
}
Ok(progress)
}
/// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
/// still matches `prev_etag`.
async fn download_heatmap(
&self,
prev_etag: Option<&Etag>,
) -> Result<HeatMapDownload, UpdateError> {
debug_assert_current_span_has_tenant_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
// TODO: make download conditional on ETag having changed since last download
// TODO: pull up etag check into the request, to do a conditional GET rather than
// issuing a GET and then maybe ignoring the response body
// (https://github.com/neondatabase/neon/issues/6199)
tracing::debug!("Downloading heatmap for secondary tenant",);
let heatmap_path = remote_heatmap_path(tenant_shard_id);
let cancel = &self.secondary_state.cancel;
let heatmap_bytes = backoff::retry(
backoff::retry(
|| async {
let download = self
.remote_storage
.download(&heatmap_path, cancel)
.await
.map_err(UpdateError::from)?;
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
Ok(heatmap_bytes)
if Some(&download.etag) == prev_etag {
Ok(HeatMapDownload::Unmodified)
} else {
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
SECONDARY_MODE.download_heatmap.inc();
Ok(HeatMapDownload::Modified(HeatMapModified {
etag: download.etag,
last_modified: download.last_modified,
bytes: heatmap_bytes,
}))
}
},
|e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
FAILED_DOWNLOAD_WARN_THRESHOLD,
@@ -548,11 +723,7 @@ impl<'a> TenantDownloader<'a> {
)
.await
.ok_or_else(|| UpdateError::Cancelled)
.and_then(|x| x)?;
SECONDARY_MODE.download_heatmap.inc();
Ok(heatmap_bytes)
.and_then(|x| x)
}
async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
@@ -593,27 +764,6 @@ impl<'a> TenantDownloader<'a> {
}
};
let layers_in_heatmap = timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
let local_path = timeline_path.join(layer.to_string());
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)
.maybe_fatal_err("Removing secondary layer")?;
}
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
@@ -662,6 +812,12 @@ impl<'a> TenantDownloader<'a> {
}
}
// Failpoint for simulating slow remote storage
failpoint_support::sleep_millis_async!(
"secondary-layer-download-sleep",
&self.secondary_state.cancel
);
// Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
let downloaded_bytes = match download_layer_file(
self.conf,
@@ -701,6 +857,11 @@ impl<'a> TenantDownloader<'a> {
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
} else {
tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
let mut progress = self.secondary_state.progress.lock().unwrap();
progress.bytes_downloaded += downloaded_bytes;
progress.layers_downloaded += 1;
}
SECONDARY_MODE.download_layer.inc();

View File

@@ -62,3 +62,25 @@ impl HeatMapTimeline {
}
}
}
pub(crate) struct HeatMapStats {
pub(crate) bytes: u64,
pub(crate) layers: usize,
}
impl HeatMapTenant {
pub(crate) fn get_stats(&self) -> HeatMapStats {
let mut stats = HeatMapStats {
bytes: 0,
layers: 0,
};
for timeline in &self.timelines {
for layer in &timeline.layers {
stats.layers += 1;
stats.bytes += layer.metadata.file_size;
}
}
stats
}
}

View File

@@ -1525,6 +1525,7 @@ class NeonCli(AbstractNeonCli):
conf: Optional[Dict[str, Any]] = None,
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
placement_policy: Optional[str] = None,
set_default: bool = False,
) -> Tuple[TenantId, TimelineId]:
"""
@@ -1558,6 +1559,9 @@ class NeonCli(AbstractNeonCli):
if shard_stripe_size is not None:
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
if placement_policy is not None:
args.extend(["--placement-policy", str(placement_policy)])
res = self.raw_cli(args)
res.check_returncode()
return tenant_id, timeline_id

View File

@@ -357,9 +357,15 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload")
self.verbose_error(res)
def tenant_secondary_download(self, tenant_id: Union[TenantId, TenantShardId]):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download")
def tenant_secondary_download(
self, tenant_id: Union[TenantId, TenantShardId], wait_ms: Optional[int] = None
) -> tuple[int, dict[Any, Any]]:
url = f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download"
if wait_ms is not None:
url = url + f"?wait_ms={wait_ms}"
res = self.post(url)
self.verbose_error(res)
return (res.status_code, res.json())
def set_tenant_config(self, tenant_id: Union[TenantId, TenantShardId], config: dict[str, Any]):
assert "tenant_id" not in config.keys()

View File

@@ -1,4 +1,5 @@
import json
import os
import random
from pathlib import Path
from typing import Any, Dict, Optional
@@ -553,3 +554,103 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
)
),
)
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
@pytest.mark.parametrize("via_controller", [True, False])
def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controller: bool):
"""
Test use of secondary download API for slow downloads, where slow means either a healthy
system with a large capacity shard, or some unhealthy remote storage.
The download API is meant to respect a client-supplied time limit, and return 200 or 202
selectively based on whether the download completed.
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Double":1}'
)
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
# Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(128)
ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id)
workload.write_rows(128)
ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id)
workload.write_rows(128)
ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id)
workload.write_rows(128)
ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id)
# Expect lots of layers
assert len(list_layers(ps_attached, tenant_id, timeline_id)) > 10
# Simulate large data by making layer downloads artifically slow
for ps in env.pageservers:
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
# Upload a heatmap, so that secondaries have something to download
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
if via_controller:
http_client = env.storage_controller.pageserver_api()
http_client.tenant_location_conf(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
"generation": None,
},
)
else:
http_client = ps_secondary.http_client()
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms
(status, progress_1) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
assert status == 202
assert progress_1["heatmap_mtime"] is not None
assert progress_1["layers_downloaded"] > 0
assert progress_1["bytes_downloaded"] > 0
assert progress_1["layers_total"] > progress_1["layers_downloaded"]
assert progress_1["bytes_total"] > progress_1["bytes_downloaded"]
# Multiple polls should work: use a shorter wait period this time
(status, progress_2) = http_client.tenant_secondary_download(tenant_id, wait_ms=1000)
assert status == 202
assert progress_2["heatmap_mtime"] is not None
assert progress_2["layers_downloaded"] > 0
assert progress_2["bytes_downloaded"] > 0
assert progress_2["layers_total"] > progress_2["layers_downloaded"]
assert progress_2["bytes_total"] > progress_2["bytes_downloaded"]
# Progress should be >= the first poll: this can only go backward if we see a new heatmap,
# and the heatmap period on the attached node is much longer than the runtime of this test, so no
# new heatmap should have been uploaded.
assert progress_2["layers_downloaded"] >= progress_1["layers_downloaded"]
assert progress_2["bytes_downloaded"] >= progress_1["bytes_downloaded"]
assert progress_2["layers_total"] == progress_1["layers_total"]
assert progress_2["bytes_total"] == progress_1["bytes_total"]
# Make downloads fast again: when the download completes within this last request, we
# get a 200 instead of a 202
for ps in env.pageservers:
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "off")])
(status, progress_3) = http_client.tenant_secondary_download(tenant_id, wait_ms=20000)
assert status == 200
assert progress_3["heatmap_mtime"] is not None
assert progress_3["layers_total"] == progress_3["layers_downloaded"]
assert progress_3["bytes_total"] == progress_3["bytes_downloaded"]