diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 560a05e908..45ee354822 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken; use utils::auth::{Scope, SwappableJwtAuth}; use utils::failpoint_support::failpoints_handler; use utils::http::endpoint::{auth_middleware, check_permission_with, request_span}; -use utils::http::request::{must_get_query_param, parse_request_param}; +use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param}; use utils::id::{TenantId, TimelineId}; use utils::{ @@ -248,8 +248,10 @@ async fn handle_tenant_secondary_download( req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; - service.tenant_secondary_download(tenant_id).await?; - json_response(StatusCode::OK, ()) + let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis); + + let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?; + json_response(status, progress) } async fn handle_tenant_delete( diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index 7f68a65c15..3bf23275bd 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -8,7 +8,7 @@ use pageserver_api::shard::{ShardIdentity, TenantShardId}; use pageserver_client::mgmt_api; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio_util::sync::CancellationToken; use utils::generation::Generation; use utils::id::{NodeId, TimelineId}; @@ -258,22 +258,81 @@ impl Reconciler { tenant_shard_id: TenantShardId, node: &Node, ) -> Result<(), ReconcileError> { - match node - .with_client_retries( - |client| async move { client.tenant_secondary_download(tenant_shard_id).await }, - &self.service_config.jwt_token, - 1, - 1, - Duration::from_secs(60), - &self.cancel, - ) - .await - { - None => Err(ReconcileError::Cancel), - Some(Ok(_)) => Ok(()), - Some(Err(e)) => { - tracing::info!(" (skipping destination download: {})", e); - Ok(()) + // This is not the timeout for a request, but the total amount of time we're willing to wait + // for a secondary location to get up to date before + const TOTAL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(300); + + // This the long-polling interval for the secondary download requests we send to destination pageserver + // during a migration. + const REQUEST_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20); + + let started_at = Instant::now(); + + loop { + let (status, progress) = match node + .with_client_retries( + |client| async move { + client + .tenant_secondary_download( + tenant_shard_id, + Some(REQUEST_DOWNLOAD_TIMEOUT), + ) + .await + }, + &self.service_config.jwt_token, + 1, + 3, + REQUEST_DOWNLOAD_TIMEOUT * 2, + &self.cancel, + ) + .await + { + None => Err(ReconcileError::Cancel), + Some(Ok(v)) => Ok(v), + Some(Err(e)) => { + // Give up, but proceed: it's unfortunate if we couldn't freshen the destination before + // attaching, but we should not let an issue with a secondary location stop us proceeding + // with a live migration. + tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})"); + return Ok(()); + } + }?; + + if status == StatusCode::OK { + tracing::info!( + "Downloads to {} complete: {}/{} layers, {}/{} bytes", + node, + progress.layers_downloaded, + progress.layers_total, + progress.bytes_downloaded, + progress.bytes_total + ); + return Ok(()); + } else if status == StatusCode::ACCEPTED { + let total_runtime = started_at.elapsed(); + if total_runtime > TOTAL_DOWNLOAD_TIMEOUT { + tracing::warn!("Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes", + total_runtime.as_millis(), + progress.layers_downloaded, + progress.layers_total, + progress.bytes_downloaded, + progress.bytes_total + ); + // Give up, but proceed: an incompletely warmed destination doesn't prevent migration working, + // it just makes the I/O performance for users less good. + return Ok(()); + } + + // Log and proceed around the loop to retry. We don't sleep between requests, because our HTTP call + // to the pageserver is a long-poll. + tracing::info!( + "Downloads to {} not yet complete: {}/{} layers, {}/{} bytes", + node, + progress.layers_downloaded, + progress.layers_total, + progress.bytes_downloaded, + progress.bytes_total + ); } } } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 8439ea5567..29f87021b2 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -16,7 +16,15 @@ use diesel::result::DatabaseErrorKind; use futures::{stream::FuturesUnordered, StreamExt}; use hyper::StatusCode; use pageserver_api::{ - controller_api::UtilizationScore, + controller_api::{ + NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy, + TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse, + TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore, + }, + models::{SecondaryProgress, TenantConfigRequest}, +}; + +use pageserver_api::{ models::{ self, LocationConfig, LocationConfigListResponse, LocationConfigMode, PageserverUtilization, ShardParameters, TenantConfig, TenantCreateRequest, @@ -30,14 +38,6 @@ use pageserver_api::{ ValidateResponse, ValidateResponseTenant, }, }; -use pageserver_api::{ - controller_api::{ - NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy, - TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse, - TenantShardMigrateRequest, TenantShardMigrateResponse, - }, - models::TenantConfigRequest, -}; use pageserver_client::mgmt_api; use tokio::sync::OwnedRwLockWriteGuard; use tokio_util::sync::CancellationToken; @@ -2084,7 +2084,8 @@ impl Service { pub(crate) async fn tenant_secondary_download( &self, tenant_id: TenantId, - ) -> Result<(), ApiError> { + wait: Option, + ) -> Result<(StatusCode, SecondaryProgress), ApiError> { let _tenant_lock = self.tenant_op_locks.shared(tenant_id).await; // Acquire lock and yield the collection of shard-node tuples which we will send requests onward to @@ -2107,32 +2108,71 @@ impl Service { targets }; - // TODO: this API, and the underlying pageserver API, should take a timeout argument so that for long running - // downloads, they can return a clean 202 response instead of the HTTP client timing out. - // Issue concurrent requests to all shards' locations let mut futs = FuturesUnordered::new(); for (tenant_shard_id, node) in targets { let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); futs.push(async move { - let result = client.tenant_secondary_download(tenant_shard_id).await; - (result, node) + let result = client + .tenant_secondary_download(tenant_shard_id, wait) + .await; + (result, node, tenant_shard_id) }) } // Handle any errors returned by pageservers. This includes cases like this request racing with // a scheduling operation, such that the tenant shard we're calling doesn't exist on that pageserver any more, as // well as more general cases like 503s, 500s, or timeouts. - while let Some((result, node)) = futs.next().await { - let Err(e) = result else { continue }; - - // Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever - // is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache - // than they had hoped for. - tracing::warn!("Ignoring tenant secondary download error from pageserver {node}: {e}",); + let mut aggregate_progress = SecondaryProgress::default(); + let mut aggregate_status: Option = None; + let mut error: Option = None; + while let Some((result, node, tenant_shard_id)) = futs.next().await { + match result { + Err(e) => { + // Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever + // is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache + // than they had hoped for. + tracing::warn!("Secondary download error from pageserver {node}: {e}",); + error = Some(e) + } + Ok((status_code, progress)) => { + tracing::info!(%tenant_shard_id, "Shard status={status_code} progress: {progress:?}"); + aggregate_progress.layers_downloaded += progress.layers_downloaded; + aggregate_progress.layers_total += progress.layers_total; + aggregate_progress.bytes_downloaded += progress.bytes_downloaded; + aggregate_progress.bytes_total += progress.bytes_total; + aggregate_progress.heatmap_mtime = + std::cmp::max(aggregate_progress.heatmap_mtime, progress.heatmap_mtime); + aggregate_status = match aggregate_status { + None => Some(status_code), + Some(StatusCode::OK) => Some(status_code), + Some(cur) => { + // Other status codes (e.g. 202) -- do not overwrite. + Some(cur) + } + }; + } + } } - Ok(()) + // If any of the shards return 202, indicate our result as 202. + match aggregate_status { + None => { + match error { + Some(e) => { + // No successes, and an error: surface it + Err(ApiError::Conflict(format!("Error from pageserver: {e}"))) + } + None => { + // No shards found + Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {} not found", tenant_id).into(), + )) + } + } + } + Some(aggregate_status) => Ok((aggregate_status, aggregate_progress)), + } } pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result { diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index ab2f80fb0c..2603515681 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -568,13 +568,6 @@ impl PageServerNode { Ok(self.http_client.list_timelines(*tenant_shard_id).await?) } - pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> { - Ok(self - .http_client - .tenant_secondary_download(*tenant_id) - .await?) - } - pub async fn timeline_create( &self, tenant_shard_id: TenantShardId, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 0d0702e38e..aad4cc97fc 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -751,6 +751,52 @@ pub struct WalRedoManagerStatus { pub pid: Option, } +/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating +/// a download job, timing out while waiting for it to run, and then inspecting this status to understand +/// what's happening. +#[derive(Default, Debug, Serialize, Deserialize, Clone)] +pub struct SecondaryProgress { + /// The remote storage LastModified time of the heatmap object we last downloaded. + #[serde( + serialize_with = "opt_ser_rfc3339_millis", + deserialize_with = "opt_deser_rfc3339_millis" + )] + pub heatmap_mtime: Option, + + /// The number of layers currently on-disk + pub layers_downloaded: usize, + /// The number of layers in the most recently seen heatmap + pub layers_total: usize, + + /// The number of layer bytes currently on-disk + pub bytes_downloaded: u64, + /// The number of layer bytes in the most recently seen heatmap + pub bytes_total: u64, +} + +fn opt_ser_rfc3339_millis( + ts: &Option, + serializer: S, +) -> Result { + match ts { + Some(ts) => serializer.collect_str(&humantime::format_rfc3339_millis(*ts)), + None => serializer.serialize_none(), + } +} + +fn opt_deser_rfc3339_millis<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::de::Deserializer<'de>, +{ + let s: Option = serde::de::Deserialize::deserialize(deserializer)?; + match s { + None => Ok(None), + Some(s) => humantime::parse_rfc3339(&s) + .map_err(serde::de::Error::custom) + .map(Some), + } +} + pub mod virtual_file { #[derive( Copy, diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 1e337bc1e8..5fff3e25c9 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -157,9 +157,8 @@ impl AzureBlobStorage { let mut bufs = Vec::new(); while let Some(part) = response.next().await { let part = part?; - let etag_str: &str = part.blob.properties.etag.as_ref(); if etag.is_none() { - etag = Some(etag.unwrap_or_else(|| etag_str.to_owned())); + etag = Some(part.blob.properties.etag); } if last_modified.is_none() { last_modified = Some(part.blob.properties.last_modified.into()); @@ -180,6 +179,7 @@ impl AzureBlobStorage { "Azure GET response contained no buffers" ))); } + // unwrap safety: if these were None, bufs would be empty and we would have returned an error already let etag = etag.unwrap(); let last_modified = last_modified.unwrap(); diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index fd832eb94f..ab2035f19a 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -42,6 +42,9 @@ pub use self::{ }; use s3_bucket::RequestKind; +/// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here. +pub use azure_core::Etag; + pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel}; /// Currently, sync happens with AWS S3, that has two limits on requests per second: @@ -293,7 +296,7 @@ pub struct Download { /// The last time the file was modified (`last-modified` HTTP header) pub last_modified: SystemTime, /// A way to identify this specific version of the resource (`etag` HTTP header) - pub etag: String, + pub etag: Etag, /// Extra key-value data, associated with the current remote file. pub metadata: Option, } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index ea0756541b..313d8226b1 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -30,6 +30,7 @@ use crate::{ }; use super::{RemoteStorage, StorageMetadata}; +use crate::Etag; const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp"; @@ -626,9 +627,9 @@ async fn file_metadata(file_path: &Utf8Path) -> Result String { +fn mock_etag(meta: &std::fs::Metadata) -> Etag { let mtime = meta.modified().expect("Filesystem mtime missing"); - format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()) + format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into() } #[cfg(test)] diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 56bc32ebdd..1cb85cfb1b 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -289,7 +289,8 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); let etag = object_output .e_tag - .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?; + .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))? + .into(); let last_modified = object_output .last_modified .ok_or(DownloadError::Other(anyhow::anyhow!( diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 1a8f7e0524..ab55d2b0a3 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -249,13 +249,26 @@ impl Client { Ok(()) } - pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> { - let uri = format!( + pub async fn tenant_secondary_download( + &self, + tenant_id: TenantShardId, + wait: Option, + ) -> Result<(StatusCode, SecondaryProgress)> { + let mut path = reqwest::Url::parse(&format!( "{}/v1/tenant/{}/secondary/download", self.mgmt_api_endpoint, tenant_id - ); - self.request(Method::POST, &uri, ()).await?; - Ok(()) + )) + .expect("Cannot build URL"); + + if let Some(wait) = wait { + path.query_pairs_mut() + .append_pair("wait_ms", &format!("{}", wait.as_millis())); + } + + let response = self.request(Method::POST, path, ()).await?; + let status = response.status(); + let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?; + Ok((status, progress)) } pub async fn location_config( diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 4823710fb5..0771229845 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -965,12 +965,28 @@ paths: required: true schema: type: string + - name: wait_ms + description: If set, we will wait this long for download to complete, and if it isn't complete then return 202 + in: query + required: false + schema: + type: integer post: description: | If the location is in secondary mode, download latest heatmap and layers responses: "200": description: Success + content: + application/json: + schema: + $ref: "#/components/schemas/SecondaryProgress" + "202": + description: Download has started but not yet finished + content: + application/json: + schema: + $ref: "#/components/schemas/SecondaryProgress" "500": description: Generic operation error content: @@ -1623,6 +1639,37 @@ components: Lower is better score for how good this pageserver would be for the next tenant. The default or maximum value can be returned in situations when a proper score cannot (yet) be calculated. + SecondaryProgress: + type: object + required: + - heatmap_mtime + - layers_downloaded + - layers_total + - bytes_downloaded + - bytes_total + properties: + heatmap_mtime: + type: string + format: date-time + description: Modification time of the most recently downloaded layer heatmap (RFC 3339 format) + layers_downloaded: + type: integer + format: int64 + description: How many layers from the latest layer heatmap are present on disk + bytes_downloaded: + type: integer + format: int64 + description: How many bytes of layer content from the latest layer heatmap are present on disk + layers_total: + type: integer + format: int64 + description: How many layers were in the latest layer heatmap + bytes_total: + type: integer + format: int64 + description: How many bytes of layer content were in the latest layer heatmap + + Error: type: object required: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7d3ede21ce..6d98d3f746 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1987,13 +1987,42 @@ async fn secondary_download_handler( ) -> Result, ApiError> { let state = get_state(&request); let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; - state - .secondary_controller - .download_tenant(tenant_shard_id) - .await - .map_err(ApiError::InternalServerError)?; + let wait = parse_query_param(&request, "wait_ms")?.map(Duration::from_millis); - json_response(StatusCode::OK, ()) + // We don't need this to issue the download request, but: + // - it enables us to cleanly return 404 if we get a request for an absent shard + // - we will use this to provide status feedback in the response + let Some(secondary_tenant) = state + .tenant_manager + .get_secondary_tenant_shard(tenant_shard_id) + else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Shard {} not found", tenant_shard_id).into(), + )); + }; + + let timeout = wait.unwrap_or(Duration::MAX); + + let status = match tokio::time::timeout( + timeout, + state.secondary_controller.download_tenant(tenant_shard_id), + ) + .await + { + // Download job ran to completion. + Ok(Ok(())) => StatusCode::OK, + // Edge case: downloads aren't usually fallible: things like a missing heatmap are considered + // okay. We could get an error here in the unlikely edge case that the tenant + // was detached between our check above and executing the download job. + Ok(Err(e)) => return Err(ApiError::InternalServerError(e)), + // A timeout is not an error: we have started the download, we're just not done + // yet. The caller will get a response body indicating status. + Err(_) => StatusCode::ACCEPTED, + }; + + let progress = secondary_tenant.progress.lock().unwrap().clone(); + + json_response(status, progress) } async fn handler_404(_: Request) -> Result, ApiError> { diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 14e88b836e..19f36c722e 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -95,7 +95,11 @@ pub(crate) struct SecondaryTenant { shard_identity: ShardIdentity, tenant_conf: std::sync::Mutex, + // Internal state used by the Downloader. detail: std::sync::Mutex, + + // Public state indicating overall progress of downloads relative to the last heatmap seen + pub(crate) progress: std::sync::Mutex, } impl SecondaryTenant { @@ -118,6 +122,8 @@ impl SecondaryTenant { tenant_conf: std::sync::Mutex::new(tenant_conf), detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())), + + progress: std::sync::Mutex::default(), }) } @@ -247,9 +253,12 @@ impl SecondaryTenant { } /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, -/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests, -/// where we want to immediately upload/download for a particular tenant. In normal operation -/// uploads & downloads are autonomous and not driven by this interface. +/// and heatmap uploads. This is not a hot data path: it's used for: +/// - Live migrations, where we want to ensure a migration destination has the freshest possible +/// content before trying to cut over. +/// - Tests, where we want to immediately upload/download for a particular tenant. +/// +/// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface. pub struct SecondaryController { upload_req_tx: tokio::sync::mpsc::Sender>, download_req_tx: tokio::sync::mpsc::Sender>, diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index b679077358..a595096133 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -41,14 +41,16 @@ use crate::tenant::{ use camino::Utf8PathBuf; use chrono::format::{DelayedFormat, StrftimeItems}; use futures::Future; +use pageserver_api::models::SecondaryProgress; use pageserver_api::shard::TenantShardId; use rand::Rng; -use remote_storage::{DownloadError, GenericRemoteStorage}; +use remote_storage::{DownloadError, Etag, GenericRemoteStorage}; use tokio_util::sync::CancellationToken; use tracing::{info_span, instrument, warn, Instrument}; use utils::{ - backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId, + backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext, + id::TimelineId, }; use super::{ @@ -128,6 +130,7 @@ pub(super) struct SecondaryDetail { pub(super) config: SecondaryLocationConfig, last_download: Option, + last_etag: Option, next_download: Option, pub(super) timelines: HashMap, } @@ -138,11 +141,26 @@ fn strftime(t: &'_ SystemTime) -> DelayedFormat> { datetime.format("%d/%m/%Y %T") } +/// Information returned from download function when it detects the heatmap has changed +struct HeatMapModified { + etag: Etag, + last_modified: SystemTime, + bytes: Vec, +} + +enum HeatMapDownload { + // The heatmap's etag has changed: return the new etag, mtime and the body bytes + Modified(HeatMapModified), + // The heatmap's etag is unchanged + Unmodified, +} + impl SecondaryDetail { pub(super) fn new(config: SecondaryLocationConfig) -> Self { Self { config, last_download: None, + last_etag: None, next_download: None, timelines: HashMap::new(), } @@ -477,11 +495,31 @@ impl<'a> TenantDownloader<'a> { }; let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + + // We will use the etag from last successful download to make the download conditional on changes + let last_etag = self + .secondary_state + .detail + .lock() + .unwrap() + .last_etag + .clone(); + // Download the tenant's heatmap - let heatmap_bytes = tokio::select!( - bytes = self.download_heatmap() => {bytes?}, + let HeatMapModified { + last_modified: heatmap_mtime, + etag: heatmap_etag, + bytes: heatmap_bytes, + } = match tokio::select!( + bytes = self.download_heatmap(last_etag.as_ref()) => {bytes?}, _ = self.secondary_state.cancel.cancelled() => return Ok(()) - ); + ) { + HeatMapDownload::Unmodified => { + tracing::info!("Heatmap unchanged since last successful download"); + return Ok(()); + } + HeatMapDownload::Modified(m) => m, + }; let heatmap = serde_json::from_slice::(&heatmap_bytes)?; @@ -498,6 +536,14 @@ impl<'a> TenantDownloader<'a> { tracing::debug!("Wrote local heatmap to {}", heatmap_path); + // Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general + // principle that deletions should be done before writes wherever possible, and so that we can use this + // phase to initialize our SecondaryProgress. + { + *self.secondary_state.progress.lock().unwrap() = + self.prepare_timelines(&heatmap, heatmap_mtime).await?; + } + // Download the layers in the heatmap for timeline in heatmap.timelines { if self.secondary_state.cancel.is_cancelled() { @@ -515,30 +561,159 @@ impl<'a> TenantDownloader<'a> { .await?; } + // Only update last_etag after a full successful download: this way will not skip + // the next download, even if the heatmap's actual etag is unchanged. + self.secondary_state.detail.lock().unwrap().last_etag = Some(heatmap_etag); + Ok(()) } - async fn download_heatmap(&self) -> Result, UpdateError> { + /// Do any fast local cleanup that comes before the much slower process of downloading + /// layers from remote storage. In the process, initialize the SecondaryProgress object + /// that will later be updated incrementally as we download layers. + async fn prepare_timelines( + &self, + heatmap: &HeatMapTenant, + heatmap_mtime: SystemTime, + ) -> Result { + let heatmap_stats = heatmap.get_stats(); + // We will construct a progress object, and then populate its initial "downloaded" numbers + // while iterating through local layer state in [`Self::prepare_timelines`] + let mut progress = SecondaryProgress { + layers_total: heatmap_stats.layers, + bytes_total: heatmap_stats.bytes, + heatmap_mtime: Some(heatmap_mtime), + layers_downloaded: 0, + bytes_downloaded: 0, + }; + // Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock + let mut delete_layers = Vec::new(); + let mut delete_timelines = Vec::new(); + { + let mut detail = self.secondary_state.detail.lock().unwrap(); + for (timeline_id, timeline_state) in &mut detail.timelines { + let Some(heatmap_timeline_index) = heatmap + .timelines + .iter() + .position(|t| t.timeline_id == *timeline_id) + else { + // This timeline is no longer referenced in the heatmap: delete it locally + delete_timelines.push(*timeline_id); + continue; + }; + + let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap(); + + let layers_in_heatmap = heatmap_timeline + .layers + .iter() + .map(|l| &l.name) + .collect::>(); + let layers_on_disk = timeline_state + .on_disk_layers + .iter() + .map(|l| l.0) + .collect::>(); + + let mut layer_count = layers_on_disk.len(); + let mut layer_byte_count: u64 = timeline_state + .on_disk_layers + .values() + .map(|l| l.metadata.file_size()) + .sum(); + + // Remove on-disk layers that are no longer present in heatmap + for layer in layers_on_disk.difference(&layers_in_heatmap) { + layer_count -= 1; + layer_byte_count -= timeline_state + .on_disk_layers + .get(layer) + .unwrap() + .metadata + .file_size(); + + delete_layers.push((*timeline_id, (*layer).clone())); + } + + progress.bytes_downloaded += layer_byte_count; + progress.layers_downloaded += layer_count; + } + } + + // Execute accumulated deletions + for (timeline_id, layer_name) in delete_layers { + let timeline_path = self + .conf + .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id); + let local_path = timeline_path.join(layer_name.to_string()); + tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",); + + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found) + .maybe_fatal_err("Removing secondary layer")?; + + // Update in-memory housekeeping to reflect the absence of the deleted layer + let mut detail = self.secondary_state.detail.lock().unwrap(); + let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else { + continue; + }; + timeline_state.on_disk_layers.remove(&layer_name); + } + + for timeline_id in delete_timelines { + let timeline_path = self + .conf + .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id); + tracing::info!(timeline_id=%timeline_id, + "Timeline no longer in heatmap, removing from secondary location" + ); + tokio::fs::remove_dir_all(&timeline_path) + .await + .or_else(fs_ext::ignore_not_found) + .maybe_fatal_err("Removing secondary timeline")?; + } + + Ok(progress) + } + + /// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object + /// still matches `prev_etag`. + async fn download_heatmap( + &self, + prev_etag: Option<&Etag>, + ) -> Result { debug_assert_current_span_has_tenant_id(); let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); - // TODO: make download conditional on ETag having changed since last download + // TODO: pull up etag check into the request, to do a conditional GET rather than + // issuing a GET and then maybe ignoring the response body // (https://github.com/neondatabase/neon/issues/6199) tracing::debug!("Downloading heatmap for secondary tenant",); let heatmap_path = remote_heatmap_path(tenant_shard_id); let cancel = &self.secondary_state.cancel; - let heatmap_bytes = backoff::retry( + backoff::retry( || async { let download = self .remote_storage .download(&heatmap_path, cancel) .await .map_err(UpdateError::from)?; - let mut heatmap_bytes = Vec::new(); - let mut body = tokio_util::io::StreamReader::new(download.download_stream); - let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?; - Ok(heatmap_bytes) + + if Some(&download.etag) == prev_etag { + Ok(HeatMapDownload::Unmodified) + } else { + let mut heatmap_bytes = Vec::new(); + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?; + SECONDARY_MODE.download_heatmap.inc(); + Ok(HeatMapDownload::Modified(HeatMapModified { + etag: download.etag, + last_modified: download.last_modified, + bytes: heatmap_bytes, + })) + } }, |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled), FAILED_DOWNLOAD_WARN_THRESHOLD, @@ -548,11 +723,7 @@ impl<'a> TenantDownloader<'a> { ) .await .ok_or_else(|| UpdateError::Cancelled) - .and_then(|x| x)?; - - SECONDARY_MODE.download_heatmap.inc(); - - Ok(heatmap_bytes) + .and_then(|x| x) } async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> { @@ -593,27 +764,6 @@ impl<'a> TenantDownloader<'a> { } }; - let layers_in_heatmap = timeline - .layers - .iter() - .map(|l| &l.name) - .collect::>(); - let layers_on_disk = timeline_state - .on_disk_layers - .iter() - .map(|l| l.0) - .collect::>(); - - // Remove on-disk layers that are no longer present in heatmap - for layer in layers_on_disk.difference(&layers_in_heatmap) { - let local_path = timeline_path.join(layer.to_string()); - tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",); - tokio::fs::remove_file(&local_path) - .await - .or_else(fs_ext::ignore_not_found) - .maybe_fatal_err("Removing secondary layer")?; - } - // Download heatmap layers that are not present on local disk, or update their // access time if they are already present. for layer in timeline.layers { @@ -662,6 +812,12 @@ impl<'a> TenantDownloader<'a> { } } + // Failpoint for simulating slow remote storage + failpoint_support::sleep_millis_async!( + "secondary-layer-download-sleep", + &self.secondary_state.cancel + ); + // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally let downloaded_bytes = match download_layer_file( self.conf, @@ -701,6 +857,11 @@ impl<'a> TenantDownloader<'a> { tokio::fs::remove_file(&local_path) .await .or_else(fs_ext::ignore_not_found)?; + } else { + tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes); + let mut progress = self.secondary_state.progress.lock().unwrap(); + progress.bytes_downloaded += downloaded_bytes; + progress.layers_downloaded += 1; } SECONDARY_MODE.download_layer.inc(); diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs index 99aaaeb8c8..73cdf6c6d4 100644 --- a/pageserver/src/tenant/secondary/heatmap.rs +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -62,3 +62,25 @@ impl HeatMapTimeline { } } } + +pub(crate) struct HeatMapStats { + pub(crate) bytes: u64, + pub(crate) layers: usize, +} + +impl HeatMapTenant { + pub(crate) fn get_stats(&self) -> HeatMapStats { + let mut stats = HeatMapStats { + bytes: 0, + layers: 0, + }; + for timeline in &self.timelines { + for layer in &timeline.layers { + stats.layers += 1; + stats.bytes += layer.metadata.file_size; + } + } + + stats + } +} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 70d3076371..56b23cef59 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1525,6 +1525,7 @@ class NeonCli(AbstractNeonCli): conf: Optional[Dict[str, Any]] = None, shard_count: Optional[int] = None, shard_stripe_size: Optional[int] = None, + placement_policy: Optional[str] = None, set_default: bool = False, ) -> Tuple[TenantId, TimelineId]: """ @@ -1558,6 +1559,9 @@ class NeonCli(AbstractNeonCli): if shard_stripe_size is not None: args.extend(["--shard-stripe-size", str(shard_stripe_size)]) + if placement_policy is not None: + args.extend(["--placement-policy", str(placement_policy)]) + res = self.raw_cli(args) res.check_returncode() return tenant_id, timeline_id diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 4e355b73a9..99ec894106 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -357,9 +357,15 @@ class PageserverHttpClient(requests.Session, MetricsGetter): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload") self.verbose_error(res) - def tenant_secondary_download(self, tenant_id: Union[TenantId, TenantShardId]): - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download") + def tenant_secondary_download( + self, tenant_id: Union[TenantId, TenantShardId], wait_ms: Optional[int] = None + ) -> tuple[int, dict[Any, Any]]: + url = f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download" + if wait_ms is not None: + url = url + f"?wait_ms={wait_ms}" + res = self.post(url) self.verbose_error(res) + return (res.status_code, res.json()) def set_tenant_config(self, tenant_id: Union[TenantId, TenantShardId], config: dict[str, Any]): assert "tenant_id" not in config.keys() diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index 79145f61b3..8ef75414a3 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -1,4 +1,5 @@ import json +import os import random from pathlib import Path from typing import Any, Dict, Optional @@ -553,3 +554,103 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder): ) ), ) + + +@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build") +@pytest.mark.parametrize("via_controller", [True, False]) +def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controller: bool): + """ + Test use of secondary download API for slow downloads, where slow means either a healthy + system with a large capacity shard, or some unhealthy remote storage. + + The download API is meant to respect a client-supplied time limit, and return 200 or 202 + selectively based on whether the download completed. + """ + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + + env.neon_cli.create_tenant( + tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Double":1}' + ) + + attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"] + ps_attached = env.get_pageserver(attached_to_id) + ps_secondary = next(p for p in env.pageservers if p != ps_attached) + + # Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis) + workload = Workload(env, tenant_id, timeline_id) + workload.init() + workload.write_rows(128) + ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id) + workload.write_rows(128) + ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id) + workload.write_rows(128) + ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id) + workload.write_rows(128) + ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id) + + # Expect lots of layers + assert len(list_layers(ps_attached, tenant_id, timeline_id)) > 10 + + # Simulate large data by making layer downloads artifically slow + for ps in env.pageservers: + ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")]) + + # Upload a heatmap, so that secondaries have something to download + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + + if via_controller: + http_client = env.storage_controller.pageserver_api() + http_client.tenant_location_conf( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + "generation": None, + }, + ) + else: + http_client = ps_secondary.http_client() + + # This has no chance to succeed: we have lots of layers and each one takes at least 1000ms + (status, progress_1) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000) + assert status == 202 + assert progress_1["heatmap_mtime"] is not None + assert progress_1["layers_downloaded"] > 0 + assert progress_1["bytes_downloaded"] > 0 + assert progress_1["layers_total"] > progress_1["layers_downloaded"] + assert progress_1["bytes_total"] > progress_1["bytes_downloaded"] + + # Multiple polls should work: use a shorter wait period this time + (status, progress_2) = http_client.tenant_secondary_download(tenant_id, wait_ms=1000) + assert status == 202 + assert progress_2["heatmap_mtime"] is not None + assert progress_2["layers_downloaded"] > 0 + assert progress_2["bytes_downloaded"] > 0 + assert progress_2["layers_total"] > progress_2["layers_downloaded"] + assert progress_2["bytes_total"] > progress_2["bytes_downloaded"] + + # Progress should be >= the first poll: this can only go backward if we see a new heatmap, + # and the heatmap period on the attached node is much longer than the runtime of this test, so no + # new heatmap should have been uploaded. + assert progress_2["layers_downloaded"] >= progress_1["layers_downloaded"] + assert progress_2["bytes_downloaded"] >= progress_1["bytes_downloaded"] + assert progress_2["layers_total"] == progress_1["layers_total"] + assert progress_2["bytes_total"] == progress_1["bytes_total"] + + # Make downloads fast again: when the download completes within this last request, we + # get a 200 instead of a 202 + for ps in env.pageservers: + ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "off")]) + (status, progress_3) = http_client.tenant_secondary_download(tenant_id, wait_ms=20000) + assert status == 200 + assert progress_3["heatmap_mtime"] is not None + assert progress_3["layers_total"] == progress_3["layers_downloaded"] + assert progress_3["bytes_total"] == progress_3["bytes_downloaded"]