diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 7d490016bf..fb0d251722 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -485,6 +485,13 @@ impl PageServerNode { Ok(self.http_client.list_timelines(*tenant_id).await?) } + pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> { + Ok(self + .http_client + .tenant_secondary_download(*tenant_id) + .await?) + } + pub async fn timeline_create( &self, tenant_id: TenantId, diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs index 79df108896..23ea8f4060 100644 --- a/control_plane/src/tenant_migration.rs +++ b/control_plane/src/tenant_migration.rs @@ -11,6 +11,7 @@ use crate::{ use pageserver_api::models::{ LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, }; +use pageserver_api::shard::TenantShardId; use std::collections::HashMap; use std::time::Duration; use utils::{ @@ -40,9 +41,9 @@ async fn await_lsn( loop { let latest = match get_lsns(tenant_id, pageserver).await { Ok(l) => l, - Err(e) => { + Err(_e) => { println!( - "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})", + "🕑 Waiting for pageserver {} to activate...", pageserver.conf.id ); std::thread::sleep(Duration::from_millis(500)); @@ -89,7 +90,7 @@ pub async fn migrate_tenant( tenant_id: TenantId, dest_ps: PageServerNode, ) -> anyhow::Result<()> { - // Get a new generation + println!("🤔 Checking existing status..."); let attachment_service = AttachmentService::from_env(env); fn build_location_config( @@ -135,6 +136,20 @@ pub async fn migrate_tenant( baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?); } + println!( + "🔁 Downloading latest layers to destination pageserver {}", + dest_ps.conf.id + ); + match dest_ps + .tenant_secondary_download(&TenantShardId::unsharded(tenant_id)) + .await + { + Ok(()) => {} + Err(_) => { + println!(" (skipping, destination wasn't in secondary mode)") + } + } + let gen = attachment_service .attach_hook(tenant_id, dest_ps.conf.id) .await?; diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 9e9b0adfe5..890061dc59 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -85,6 +85,8 @@ pub mod sync; pub mod failpoint_support; +pub mod yielding_loop; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/libs/utils/src/sync/gate.rs b/libs/utils/src/sync/gate.rs index 31c76d2f74..abc3842da8 100644 --- a/libs/utils/src/sync/gate.rs +++ b/libs/utils/src/sync/gate.rs @@ -15,6 +15,12 @@ pub struct Gate { name: String, } +impl std::fmt::Debug for Gate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Gate<{}>", self.name) + } +} + /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will /// not complete. #[derive(Debug)] diff --git a/libs/utils/src/yielding_loop.rs b/libs/utils/src/yielding_loop.rs new file mode 100644 index 0000000000..963279eb4c --- /dev/null +++ b/libs/utils/src/yielding_loop.rs @@ -0,0 +1,35 @@ +use tokio_util::sync::CancellationToken; + +#[derive(thiserror::Error, Debug)] +pub enum YieldingLoopError { + #[error("Cancelled")] + Cancelled, +} + +/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically +/// yields to avoid blocking the executor, and after resuming checks the provided +/// cancellation token to drop out promptly on shutdown. +#[inline(always)] +pub async fn yielding_loop( + interval: usize, + cancel: &CancellationToken, + iter: I, + mut visitor: F, +) -> Result<(), YieldingLoopError> +where + I: Iterator, + F: FnMut(T), +{ + for (i, item) in iter.enumerate() { + visitor(item); + + if i + 1 % interval == 0 { + tokio::task::yield_now().await; + if cancel.is_cancelled() { + return Err(YieldingLoopError::Cancelled); + } + } + } + + Ok(()) +} diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 87e4ed8efd..4c285293f7 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,4 +1,4 @@ -use pageserver_api::models::*; +use pageserver_api::{models::*, shard::TenantShardId}; use reqwest::{IntoUrl, Method}; use utils::{ http::error::HttpErrorBody, @@ -164,6 +164,18 @@ impl Client { Ok(()) } + pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> { + let uri = format!( + "{}/v1/tenant/{}/secondary/download", + self.mgmt_api_endpoint, tenant_id + ); + self.request(Method::POST, &uri, ()) + .await? + .error_for_status() + .map(|_| ()) + .map_err(|e| Error::ApiError(format!("{}", e))) + } + pub async fn location_config( &self, tenant_id: TenantId, diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 4560f5eca0..7c03dc1bdd 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -37,8 +37,8 @@ use crate::tenant::{ TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME, }; use crate::{ - IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME, - TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX, + IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME, + TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX, }; use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP; @@ -75,6 +75,7 @@ pub mod defaults { pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s"; pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8; + pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1; pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100; @@ -130,6 +131,7 @@ pub mod defaults { #gc_feedback = false #heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY} +#secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY} [remote_storage] @@ -239,6 +241,10 @@ pub struct PageServerConf { /// heatmap uploads vs. other remote storage operations. pub heatmap_upload_concurrency: usize, + /// How many remote storage downloads may be done for secondary tenants concurrently. Implicitly + /// deprioritises secondary downloads vs. remote storage operations for attached tenants. + pub secondary_download_concurrency: usize, + /// Maximum number of WAL records to be ingested and committed at the same time pub ingest_batch_size: u64, } @@ -322,6 +328,7 @@ struct PageServerConfigBuilder { control_plane_emergency_mode: BuilderValue, heatmap_upload_concurrency: BuilderValue, + secondary_download_concurrency: BuilderValue, ingest_batch_size: BuilderValue, } @@ -396,6 +403,7 @@ impl Default for PageServerConfigBuilder { control_plane_emergency_mode: Set(false), heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY), + secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY), ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE), } @@ -546,6 +554,10 @@ impl PageServerConfigBuilder { self.heatmap_upload_concurrency = BuilderValue::Set(value) } + pub fn secondary_download_concurrency(&mut self, value: usize) { + self.secondary_download_concurrency = BuilderValue::Set(value) + } + pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) { self.ingest_batch_size = BuilderValue::Set(ingest_batch_size) } @@ -651,6 +663,9 @@ impl PageServerConfigBuilder { heatmap_upload_concurrency: self .heatmap_upload_concurrency .ok_or(anyhow!("missing heatmap_upload_concurrency"))?, + secondary_download_concurrency: self + .secondary_download_concurrency + .ok_or(anyhow!("missing secondary_download_concurrency"))?, ingest_batch_size: self .ingest_batch_size .ok_or(anyhow!("missing ingest_batch_size"))?, @@ -711,6 +726,11 @@ impl PageServerConf { .join(TENANT_LOCATION_CONFIG_NAME) } + pub(crate) fn tenant_heatmap_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf { + self.tenant_path(tenant_shard_id) + .join(TENANT_HEATMAP_BASENAME) + } + pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf { self.tenant_path(tenant_shard_id) .join(TIMELINES_SEGMENT_NAME) @@ -896,6 +916,9 @@ impl PageServerConf { "heatmap_upload_concurrency" => { builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize) }, + "secondary_download_concurrency" => { + builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize) + }, "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } @@ -968,6 +991,7 @@ impl PageServerConf { control_plane_api_token: None, control_plane_emergency_mode: false, heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, } } @@ -1198,6 +1222,7 @@ background_task_maximum_delay = '334 s' control_plane_api_token: None, control_plane_emergency_mode: false, heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, }, "Correct defaults should be used when no config values are provided" @@ -1260,6 +1285,7 @@ background_task_maximum_delay = '334 s' control_plane_api_token: None, control_plane_emergency_mode: false, heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: 100, }, "Should be able to parse all basic config values correctly" diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8265627cb5..5c7747d353 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1274,6 +1274,23 @@ async fn put_tenant_location_config_handler( // which is not a 400 but a 409. .map_err(ApiError::BadRequest)?; + if let Some(_flush_ms) = flush { + match state + .secondary_controller + .upload_tenant(tenant_shard_id) + .await + { + Ok(()) => { + tracing::info!("Uploaded heatmap during flush"); + } + Err(e) => { + tracing::warn!("Failed to flush heatmap: {e}"); + } + } + } else { + tracing::info!("No flush requested when configuring"); + } + json_response(StatusCode::OK, ()) } @@ -1611,6 +1628,21 @@ async fn secondary_upload_handler( json_response(StatusCode::OK, ()) } +async fn secondary_download_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let state = get_state(&request); + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + state + .secondary_controller + .download_tenant(tenant_shard_id) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + async fn handler_404(_: Request) -> Result, ApiError> { json_response( StatusCode::NOT_FOUND, @@ -1879,6 +1911,9 @@ pub fn make_router( .put("/v1/deletion_queue/flush", |r| { api_handler(r, deletion_queue_flush) }) + .post("/v1/tenant/:tenant_shard_id/secondary/download", |r| { + api_handler(r, secondary_download_handler) + }) .put("/v1/tenant/:tenant_shard_id/break", |r| { testing_api_handler("set tenant state to broken", r, handle_tenant_break) }) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index c1ce0af47b..26070e0cc1 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -117,6 +117,10 @@ pub const TENANT_CONFIG_NAME: &str = "config"; /// Full path: `tenants//config`. pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1"; +/// Per-tenant copy of their remote heatmap, downloaded into the local +/// tenant path while in secondary mode. +pub const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json"; + /// A suffix used for various temporary files. Any temporary files found in the /// data directory at pageserver startup can be automatically removed. pub const TEMP_FILE_SUFFIX: &str = "___temp"; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 4725903783..c86adcfa3d 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1369,6 +1369,8 @@ pub(crate) struct SecondaryModeMetrics { pub(crate) upload_heatmap: IntCounter, pub(crate) upload_heatmap_errors: IntCounter, pub(crate) upload_heatmap_duration: Histogram, + pub(crate) download_heatmap: IntCounter, + pub(crate) download_layer: IntCounter, } pub(crate) static SECONDARY_MODE: Lazy = Lazy::new(|| SecondaryModeMetrics { upload_heatmap: register_int_counter!( @@ -1386,6 +1388,16 @@ pub(crate) static SECONDARY_MODE: Lazy = Lazy::new(|| Seco "Time to build and upload a heatmap, including any waiting inside the S3 client" ) .expect("failed to define a metric"), + download_heatmap: register_int_counter!( + "pageserver_secondary_download_heatmap", + "Number of downloads of heatmaps by secondary mode locations" + ) + .expect("failed to define a metric"), + download_layer: register_int_counter!( + "pageserver_secondary_download_layer", + "Number of downloads of layers by secondary mode locations" + ) + .expect("failed to define a metric"), }); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index eabb1d0022..5a06a97525 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -258,6 +258,9 @@ pub enum TaskKind { /// See [`crate::disk_usage_eviction_task`]. DiskUsageEviction, + /// See [`crate::tenant::secondary`]. + SecondaryDownloads, + /// See [`crate::tenant::secondary`]. SecondaryUploads, diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index b21bad51ba..2f606ed822 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -588,7 +588,7 @@ impl DeleteTenantFlow { } break; } - TenantsMapRemoveResult::Occupied(TenantSlot::Secondary) => { + TenantsMapRemoveResult::Occupied(TenantSlot::Secondary(_)) => { // This is unexpected: this secondary tenants should not have been created, and we // are not in a position to shut it down from here. tracing::warn!("Tenant transitioned to secondary mode while deleting!"); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 250de7247d..70b41b7b1f 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -44,6 +44,7 @@ use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use super::delete::DeleteTenantError; +use super::secondary::SecondaryTenant; use super::TenantSharedResources; /// For a tenant that appears in TenantsMap, it may either be @@ -57,7 +58,7 @@ use super::TenantSharedResources; /// having a properly acquired generation (Secondary doesn't need a generation) pub(crate) enum TenantSlot { Attached(Arc), - Secondary, + Secondary(Arc), /// In this state, other administrative operations acting on the TenantId should /// block, or return a retry indicator equivalent to HTTP 503. InProgress(utils::completion::Barrier), @@ -67,7 +68,7 @@ impl std::fmt::Debug for TenantSlot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()), - Self::Secondary => write!(f, "Secondary"), + Self::Secondary(_) => write!(f, "Secondary"), Self::InProgress(_) => write!(f, "InProgress"), } } @@ -78,7 +79,7 @@ impl TenantSlot { fn get_attached(&self) -> Option<&Arc> { match self { Self::Attached(t) => Some(t), - Self::Secondary => None, + Self::Secondary(_) => None, Self::InProgress(_) => None, } } @@ -466,12 +467,18 @@ pub async fn init_tenant_mgr( *gen } else { match &location_conf.mode { - LocationMode::Secondary(_) => { + LocationMode::Secondary(secondary_config) => { // We do not require the control plane's permission for secondary mode // tenants, because they do no remote writes and hence require no // generation number info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode"); - tenants.insert(tenant_shard_id, TenantSlot::Secondary); + tenants.insert( + tenant_shard_id, + TenantSlot::Secondary(SecondaryTenant::new( + tenant_shard_id, + secondary_config, + )), + ); } LocationMode::Attached(_) => { // TODO: augment re-attach API to enable the control plane to @@ -663,8 +670,14 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { total_attached += 1; } - TenantSlot::Secondary => { - shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary); + TenantSlot::Secondary(state) => { + // We don't need to wait for this individually per-tenant: the + // downloader task will be waited on eventually, this cancel + // is just to encourage it to drop out if it is doing work + // for this tenant right now. + state.cancel.cancel(); + + shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state)); } TenantSlot::InProgress(notify) => { // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will @@ -847,12 +860,28 @@ impl TenantManager { Some(TenantSlot::InProgress(_)) => { Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) } - None | Some(TenantSlot::Secondary) => { + None | Some(TenantSlot::Secondary(_)) => { Err(GetTenantError::NotFound(tenant_shard_id.tenant_id)) } } } + pub(crate) fn get_secondary_tenant_shard( + &self, + tenant_shard_id: TenantShardId, + ) -> Option> { + let locked = self.tenants.read().unwrap(); + + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) + .ok() + .flatten(); + + match peek_slot { + Some(TenantSlot::Secondary(s)) => Some(s.clone()), + _ => None, + } + } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, @@ -864,10 +893,15 @@ impl TenantManager { debug_assert_current_span_has_tenant_id(); info!("configuring tenant location to state {new_location_config:?}"); - // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, + enum FastPathModified { + Attached(Arc), + Secondary(Arc), + } + + // Special case fast-path for updates to existing slots: if our upsert is only updating configuration, // then we do not need to set the slot to InProgress, we can just call into the // existng tenant. - let modify_tenant = { + let fast_path_taken = { let locked = self.tenants.read().unwrap(); let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?; @@ -881,12 +915,19 @@ impl TenantManager { new_location_config.clone(), )?); - Some(tenant.clone()) + Some(FastPathModified::Attached(tenant.clone())) } else { // Different generations, fall through to general case None } } + ( + LocationMode::Secondary(secondary_conf), + Some(TenantSlot::Secondary(secondary_tenant)), + ) => { + secondary_tenant.set_config(secondary_conf); + Some(FastPathModified::Secondary(secondary_tenant.clone())) + } _ => { // Not an Attached->Attached transition, fall through to general case None @@ -895,34 +936,51 @@ impl TenantManager { }; // Fast-path continued: having dropped out of the self.tenants lock, do the async - // phase of waiting for flush, before returning. - if let Some(tenant) = modify_tenant { - // Transition to AttachedStale means we may well hold a valid generation - // still, and have been requested to go stale as part of a migration. If - // the caller set `flush`, then flush to remote storage. - if let LocationMode::Attached(AttachedLocationConfig { - generation: _, - attach_mode: AttachmentMode::Stale, - }) = &new_location_config.mode - { - if let Some(flush_timeout) = flush { - match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await { - Ok(Err(e)) => { - return Err(e); - } - Ok(Ok(_)) => return Ok(()), - Err(_) => { - tracing::warn!( + // phase of writing config and/or waiting for flush, before returning. + match fast_path_taken { + Some(FastPathModified::Attached(tenant)) => { + Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; + + // Transition to AttachedStale means we may well hold a valid generation + // still, and have been requested to go stale as part of a migration. If + // the caller set `flush`, then flush to remote storage. + if let LocationMode::Attached(AttachedLocationConfig { + generation: _, + attach_mode: AttachmentMode::Stale, + }) = &new_location_config.mode + { + if let Some(flush_timeout) = flush { + match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await { + Ok(Err(e)) => { + return Err(e); + } + Ok(Ok(_)) => return Ok(()), + Err(_) => { + tracing::warn!( timeout_ms = flush_timeout.as_millis(), "Timed out waiting for flush to remote storage, proceeding anyway." ) + } } } } - } - return Ok(()); - } + return Ok(()); + } + Some(FastPathModified::Secondary(_secondary_tenant)) => { + Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; + + return Ok(()); + } + None => { + // Proceed with the general case procedure, where we will shutdown & remove any existing + // slot contents and replace with a fresh one + } + }; // General case for upserts to TenantsMap, excluding the case above: we will substitute an // InProgress value to the slot while we make whatever changes are required. The state for @@ -931,33 +989,47 @@ impl TenantManager { // not do significant I/O, and shutdowns should be prompt via cancellation tokens. let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; - if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() { - // The case where we keep a Tenant alive was covered above in the special case - // for Attached->Attached transitions in the same generation. By this point, - // if we see an attached tenant we know it will be discarded and should be - // shut down. - let (_guard, progress) = utils::completion::channel(); + match slot_guard.get_old_value() { + Some(TenantSlot::Attached(tenant)) => { + // The case where we keep a Tenant alive was covered above in the special case + // for Attached->Attached transitions in the same generation. By this point, + // if we see an attached tenant we know it will be discarded and should be + // shut down. + let (_guard, progress) = utils::completion::channel(); - match tenant.get_attach_mode() { - AttachmentMode::Single | AttachmentMode::Multi => { - // Before we leave our state as the presumed holder of the latest generation, - // flush any outstanding deletions to reduce the risk of leaking objects. - self.resources.deletion_queue_client.flush_advisory() - } - AttachmentMode::Stale => { - // If we're stale there's not point trying to flush deletions - } - }; + match tenant.get_attach_mode() { + AttachmentMode::Single | AttachmentMode::Multi => { + // Before we leave our state as the presumed holder of the latest generation, + // flush any outstanding deletions to reduce the risk of leaking objects. + self.resources.deletion_queue_client.flush_advisory() + } + AttachmentMode::Stale => { + // If we're stale there's not point trying to flush deletions + } + }; - info!("Shutting down attached tenant"); - match tenant.shutdown(progress, false).await { - Ok(()) => {} - Err(barrier) => { - info!("Shutdown already in progress, waiting for it to complete"); - barrier.wait().await; + info!("Shutting down attached tenant"); + match tenant.shutdown(progress, false).await { + Ok(()) => {} + Err(barrier) => { + info!("Shutdown already in progress, waiting for it to complete"); + barrier.wait().await; + } } + slot_guard.drop_old_value().expect("We just shut it down"); + } + Some(TenantSlot::Secondary(state)) => { + info!("Shutting down secondary tenant"); + state.shutdown().await; + } + Some(TenantSlot::InProgress(_)) => { + // This should never happen: acquire_slot should error out + // if the contents of a slot were InProgress. + anyhow::bail!("Acquired an InProgress slot, this is a bug.") + } + None => { + // Slot was vacant, nothing needs shutting down. } - slot_guard.drop_old_value().expect("We just shut it down"); } let tenant_path = self.conf.tenant_path(&tenant_shard_id); @@ -980,7 +1052,9 @@ impl TenantManager { .map_err(SetNewTenantConfigError::Persist)?; let new_slot = match &new_location_config.mode { - LocationMode::Secondary(_) => TenantSlot::Secondary, + LocationMode::Secondary(secondary_config) => { + TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id, secondary_config)) + } LocationMode::Attached(_attach_config) => { let shard_identity = new_location_config.shard; let tenant = tenant_spawn( @@ -1093,6 +1167,30 @@ impl TenantManager { .collect(), } } + // Do some synchronous work for all tenant slots in Secondary state. The provided + // callback should be small and fast, as it will be called inside the global + // TenantsMap lock. + pub(crate) fn foreach_secondary_tenants(&self, mut func: F) + where + // TODO: let the callback return a hint to drop out of the loop early + F: FnMut(&TenantShardId, &Arc), + { + let locked = self.tenants.read().unwrap(); + + let map = match &*locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return, + TenantsMap::Open(m) => m, + }; + + for (tenant_id, slot) in map { + if let TenantSlot::Secondary(state) = slot { + // Only expose secondary tenants that are not currently shutting down + if !state.cancel.is_cancelled() { + func(tenant_id, state) + } + } + } + } pub(crate) async fn delete_tenant( &self, @@ -1207,7 +1305,7 @@ pub(crate) fn get_tenant( Some(TenantSlot::InProgress(_)) => { Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) } - None | Some(TenantSlot::Secondary) => { + None | Some(TenantSlot::Secondary(_)) => { Err(GetTenantError::NotFound(tenant_shard_id.tenant_id)) } } @@ -1280,7 +1378,7 @@ pub(crate) async fn get_active_tenant_with_timeout( } } } - Some(TenantSlot::Secondary) => { + Some(TenantSlot::Secondary(_)) => { return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( tenant_id, ))) @@ -1544,7 +1642,7 @@ pub(crate) async fn list_tenants() -> Result, Ok(m.iter() .filter_map(|(id, tenant)| match tenant { TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), - TenantSlot::Secondary => None, + TenantSlot::Secondary(_) => None, TenantSlot::InProgress(_) => None, }) .collect()) @@ -1801,11 +1899,7 @@ impl SlotGuard { fn old_value_is_shutdown(&self) -> bool { match self.old_value.as_ref() { Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(), - Some(TenantSlot::Secondary) => { - // TODO: when adding secondary mode tenants, this will check for shutdown - // in the same way that we do for `Tenant` above - true - } + Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(), Some(TenantSlot::InProgress(_)) => { // A SlotGuard cannot be constructed for a slot that was already InProgress unreachable!() @@ -2015,26 +2109,19 @@ where let mut slot_guard = tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?; - // The SlotGuard allows us to manipulate the Tenant object without fear of some - // concurrent API request doing something else for the same tenant ID. - let attached_tenant = match slot_guard.get_old_value() { - Some(TenantSlot::Attached(t)) => Some(t), - _ => None, - }; - // allow pageserver shutdown to await for our completion let (_guard, progress) = completion::channel(); - // If the tenant was attached, shut it down gracefully. For secondary - // locations this part is not necessary - match &attached_tenant { - Some(attached_tenant) => { + // The SlotGuard allows us to manipulate the Tenant object without fear of some + // concurrent API request doing something else for the same tenant ID. + let attached_tenant = match slot_guard.get_old_value() { + Some(TenantSlot::Attached(tenant)) => { // whenever we remove a tenant from memory, we don't want to flush and wait for upload let freeze_and_flush = false; // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so // that we can continue safely to cleanup. - match attached_tenant.shutdown(progress, freeze_and_flush).await { + match tenant.shutdown(progress, freeze_and_flush).await { Ok(()) => {} Err(_other) => { // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to @@ -2043,11 +2130,19 @@ where return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id)); } } + Some(tenant) } - None => { - // Nothing to wait on when not attached, proceed. + Some(TenantSlot::Secondary(secondary_state)) => { + tracing::info!("Shutting down in secondary mode"); + secondary_state.shutdown().await; + None } - } + Some(TenantSlot::InProgress(_)) => { + // Acquiring a slot guarantees its old value was not InProgress + unreachable!(); + } + None => None, + }; match tenant_cleanup .await diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 60b40d70a7..2ea3ced008 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -229,6 +229,7 @@ use crate::{ tenant::upload_queue::{ UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask, }, + TENANT_HEATMAP_BASENAME, }; use utils::id::{TenantId, TimelineId}; @@ -1741,11 +1742,11 @@ pub fn remote_index_path( .expect("Failed to construct path") } -pub const HEATMAP_BASENAME: &str = "heatmap-v1.json"; - pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath { - RemotePath::from_string(&format!("tenants/{tenant_shard_id}/{HEATMAP_BASENAME}")) - .expect("Failed to construct path") + RemotePath::from_string(&format!( + "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}" + )) + .expect("Failed to construct path") } /// Given the key of an index, parse out the generation part of the name diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index d25fe56b92..2331447266 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -1,24 +1,48 @@ +mod downloader; pub mod heatmap; mod heatmap_uploader; +mod scheduler; use std::sync::Arc; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; -use self::heatmap_uploader::heatmap_uploader_task; +use self::{ + downloader::{downloader_task, SecondaryDetail}, + heatmap_uploader::heatmap_uploader_task, +}; -use super::mgr::TenantManager; +use super::{config::SecondaryLocationConfig, mgr::TenantManager}; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use tokio_util::sync::CancellationToken; -use utils::completion::Barrier; +use utils::{completion::Barrier, sync::gate::Gate}; +enum DownloadCommand { + Download(TenantShardId), +} enum UploadCommand { Upload(TenantShardId), } +impl UploadCommand { + fn get_tenant_shard_id(&self) -> &TenantShardId { + match self { + Self::Upload(id) => id, + } + } +} + +impl DownloadCommand { + fn get_tenant_shard_id(&self) -> &TenantShardId { + match self { + Self::Download(id) => id, + } + } +} + struct CommandRequest { payload: T, response_tx: tokio::sync::oneshot::Sender, @@ -28,12 +52,73 @@ struct CommandResponse { result: anyhow::Result<()>, } +// Whereas [`Tenant`] represents an attached tenant, this type represents the work +// we do for secondary tenant locations: where we are not serving clients or +// ingesting WAL, but we are maintaining a warm cache of layer files. +// +// This type is all about the _download_ path for secondary mode. The upload path +// runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists. +// +// This structure coordinates TenantManager and SecondaryDownloader, +// so that the downloader can indicate which tenants it is currently +// operating on, and the manager can indicate when a particular +// secondary tenant should cancel any work in flight. +#[derive(Debug)] +pub(crate) struct SecondaryTenant { + /// Carrying a tenant shard ID simplifies callers such as the downloader + /// which need to organize many of these objects by ID. + tenant_shard_id: TenantShardId, + + /// Cancellation token indicates to SecondaryDownloader that it should stop doing + /// any work for this tenant at the next opportunity. + pub(crate) cancel: CancellationToken, + + pub(crate) gate: Gate, + + detail: std::sync::Mutex, +} + +impl SecondaryTenant { + pub(crate) fn new( + tenant_shard_id: TenantShardId, + config: &SecondaryLocationConfig, + ) -> Arc { + Arc::new(Self { + tenant_shard_id, + // todo: shall we make this a descendent of the + // main cancellation token, or is it sufficient that + // on shutdown we walk the tenants and fire their + // individual cancellations? + cancel: CancellationToken::new(), + gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")), + + detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())), + }) + } + + pub(crate) async fn shutdown(&self) { + self.cancel.cancel(); + + // Wait for any secondary downloader work to complete + self.gate.close().await; + } + + pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) { + self.detail.lock().unwrap().config = config.clone(); + } + + fn get_tenant_shard_id(&self) -> &TenantShardId { + &self.tenant_shard_id + } +} + /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, /// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests, /// where we want to immediately upload/download for a particular tenant. In normal operation /// uploads & downloads are autonomous and not driven by this interface. pub struct SecondaryController { upload_req_tx: tokio::sync::mpsc::Sender>, + download_req_tx: tokio::sync::mpsc::Sender>, } impl SecondaryController { @@ -63,6 +148,13 @@ impl SecondaryController { self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id)) .await } + pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { + self.dispatch( + &self.download_req_tx, + DownloadCommand::Download(tenant_shard_id), + ) + .await + } } pub fn spawn_tasks( @@ -71,9 +163,37 @@ pub fn spawn_tasks( background_jobs_can_start: Barrier, cancel: CancellationToken, ) -> SecondaryController { + let mgr_clone = tenant_manager.clone(); + let storage_clone = remote_storage.clone(); + let cancel_clone = cancel.clone(); + let bg_jobs_clone = background_jobs_can_start.clone(); + + let (download_req_tx, download_req_rx) = + tokio::sync::mpsc::channel::>(16); let (upload_req_tx, upload_req_rx) = tokio::sync::mpsc::channel::>(16); + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::SecondaryDownloads, + None, + None, + "secondary tenant downloads", + false, + async move { + downloader_task( + mgr_clone, + storage_clone, + download_req_rx, + bg_jobs_clone, + cancel_clone, + ) + .await; + + Ok(()) + }, + ); + task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryUploads, @@ -89,16 +209,26 @@ pub fn spawn_tasks( background_jobs_can_start, cancel, ) - .await + .await; + + Ok(()) }, ); - SecondaryController { upload_req_tx } + SecondaryController { + download_req_tx, + upload_req_tx, + } } /// For running with remote storage disabled: a SecondaryController that is connected to nothing. pub fn null_controller() -> SecondaryController { + let (download_req_tx, _download_req_rx) = + tokio::sync::mpsc::channel::>(16); let (upload_req_tx, _upload_req_rx) = tokio::sync::mpsc::channel::>(16); - SecondaryController { upload_req_tx } + SecondaryController { + upload_req_tx, + download_req_tx, + } } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs new file mode 100644 index 0000000000..6fdee08a4e --- /dev/null +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -0,0 +1,801 @@ +use std::{ + collections::{HashMap, HashSet}, + pin::Pin, + str::FromStr, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; + +use crate::{ + config::PageServerConf, + metrics::SECONDARY_MODE, + tenant::{ + config::SecondaryLocationConfig, + debug_assert_current_span_has_tenant_and_timeline_id, + remote_timeline_client::{ + index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, + }, + span::debug_assert_current_span_has_tenant_id, + storage_layer::LayerFileName, + tasks::{warn_when_period_overrun, BackgroundLoopKind}, + }, + virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}, + METADATA_FILE_NAME, TEMP_FILE_SUFFIX, +}; + +use super::{ + heatmap::HeatMapLayer, + scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs}, + SecondaryTenant, +}; + +use crate::tenant::{ + mgr::TenantManager, + remote_timeline_client::{download::download_layer_file, remote_heatmap_path}, +}; + +use chrono::format::{DelayedFormat, StrftimeItems}; +use futures::Future; +use pageserver_api::shard::TenantShardId; +use rand::Rng; +use remote_storage::{DownloadError, GenericRemoteStorage}; + +use tokio_util::sync::CancellationToken; +use tracing::{info_span, instrument, Instrument}; +use utils::{ + backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId, +}; + +use super::{ + heatmap::{HeatMapTenant, HeatMapTimeline}, + CommandRequest, DownloadCommand, +}; + +/// For each tenant, how long must have passed since the last download_tenant call before +/// calling it again. This is approximately the time by which local data is allowed +/// to fall behind remote data. +/// +/// TODO: this should just be a default, and the actual period should be controlled +/// via the heatmap itself +/// `` +const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000); + +pub(super) async fn downloader_task( + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) { + let concurrency = tenant_manager.get_conf().secondary_download_concurrency; + + let generator = SecondaryDownloader { + tenant_manager, + remote_storage, + }; + let mut scheduler = Scheduler::new(generator, concurrency); + + scheduler + .run(command_queue, background_jobs_can_start, cancel) + .instrument(info_span!("secondary_downloads")) + .await +} + +struct SecondaryDownloader { + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, +} + +#[derive(Debug, Clone)] +pub(super) struct OnDiskState { + metadata: LayerFileMetadata, + access_time: SystemTime, +} + +impl OnDiskState { + fn new( + _conf: &'static PageServerConf, + _tenant_shard_id: &TenantShardId, + _imeline_id: &TimelineId, + _ame: LayerFileName, + metadata: LayerFileMetadata, + access_time: SystemTime, + ) -> Self { + Self { + metadata, + access_time, + } + } +} + +#[derive(Debug, Clone, Default)] +pub(super) struct SecondaryDetailTimeline { + pub(super) on_disk_layers: HashMap, + + /// We remember when layers were evicted, to prevent re-downloading them. + pub(super) evicted_at: HashMap, +} + +/// This state is written by the secondary downloader, it is opaque +/// to TenantManager +#[derive(Debug)] +pub(super) struct SecondaryDetail { + pub(super) config: SecondaryLocationConfig, + + last_download: Option, + next_download: Option, + pub(super) timelines: HashMap, +} + +/// Helper for logging SystemTime +fn strftime(t: &'_ SystemTime) -> DelayedFormat> { + let datetime: chrono::DateTime = (*t).into(); + datetime.format("%d/%m/%Y %T") +} + +impl SecondaryDetail { + pub(super) fn new(config: SecondaryLocationConfig) -> Self { + Self { + config, + last_download: None, + next_download: None, + timelines: HashMap::new(), + } + } +} + +struct PendingDownload { + secondary_state: Arc, + last_download: Option, + target_time: Option, + period: Option, +} + +impl scheduler::PendingJob for PendingDownload { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.secondary_state.get_tenant_shard_id() + } +} + +struct RunningDownload { + barrier: Barrier, +} + +impl scheduler::RunningJob for RunningDownload { + fn get_barrier(&self) -> Barrier { + self.barrier.clone() + } +} + +struct CompleteDownload { + secondary_state: Arc, + completed_at: Instant, +} + +impl scheduler::Completion for CompleteDownload { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.secondary_state.get_tenant_shard_id() + } +} + +type Scheduler = TenantBackgroundJobs< + SecondaryDownloader, + PendingDownload, + RunningDownload, + CompleteDownload, + DownloadCommand, +>; + +#[async_trait::async_trait] +impl JobGenerator + for SecondaryDownloader +{ + #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))] + fn on_completion(&mut self, completion: CompleteDownload) { + let CompleteDownload { + secondary_state, + completed_at: _completed_at, + } = completion; + + tracing::debug!("Secondary tenant download completed"); + + // Update freshened_at even if there was an error: we don't want errored tenants to implicitly + // take priority to run again. + let mut detail = secondary_state.detail.lock().unwrap(); + detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL); + } + + async fn schedule(&mut self) -> SchedulingResult { + let mut result = SchedulingResult { + jobs: Vec::new(), + want_interval: None, + }; + + // Step 1: identify some tenants that we may work on + let mut tenants: Vec> = Vec::new(); + self.tenant_manager + .foreach_secondary_tenants(|_id, secondary_state| { + tenants.push(secondary_state.clone()); + }); + + // Step 2: filter out tenants which are not yet elegible to run + let now = Instant::now(); + result.jobs = tenants + .into_iter() + .filter_map(|secondary_tenant| { + let (last_download, next_download) = { + let mut detail = secondary_tenant.detail.lock().unwrap(); + + if !detail.config.warm { + // Downloads are disabled for this tenant + detail.next_download = None; + return None; + } + + if detail.next_download.is_none() { + // Initialize with a jitter: this spreads initial downloads on startup + // or mass-attach across our freshen interval. + let jittered_period = + rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL); + detail.next_download = Some(now.checked_add(jittered_period).expect( + "Using our constant, which is known to be small compared with clock range", + )); + } + (detail.last_download, detail.next_download.unwrap()) + }; + + if now < next_download { + Some(PendingDownload { + secondary_state: secondary_tenant, + last_download, + target_time: Some(next_download), + period: Some(DOWNLOAD_FRESHEN_INTERVAL), + }) + } else { + None + } + }) + .collect(); + + // Step 3: sort by target execution time to run most urgent first. + result.jobs.sort_by_key(|j| j.target_time); + + result + } + + fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result { + let tenant_shard_id = command.get_tenant_shard_id(); + + let tenant = self + .tenant_manager + .get_secondary_tenant_shard(*tenant_shard_id); + let Some(tenant) = tenant else { + { + return Err(anyhow::anyhow!("Not found or not in Secondary mode")); + } + }; + + Ok(PendingDownload { + target_time: None, + period: None, + last_download: None, + secondary_state: tenant, + }) + } + + fn spawn( + &mut self, + job: PendingDownload, + ) -> ( + RunningDownload, + Pin + Send>>, + ) { + let PendingDownload { + secondary_state, + last_download, + target_time, + period, + } = job; + + let (completion, barrier) = utils::completion::channel(); + let remote_storage = self.remote_storage.clone(); + let conf = self.tenant_manager.get_conf(); + let tenant_shard_id = *secondary_state.get_tenant_shard_id(); + (RunningDownload { barrier }, Box::pin(async move { + let _completion = completion; + + match TenantDownloader::new(conf, &remote_storage, &secondary_state) + .download() + .await + { + Err(UpdateError::NoData) => { + tracing::info!("No heatmap found for tenant. This is fine if it is new."); + }, + Err(UpdateError::NoSpace) => { + tracing::warn!("Insufficient space while downloading. Will retry later."); + } + Err(UpdateError::Cancelled) => { + tracing::debug!("Shut down while downloading"); + }, + Err(UpdateError::Deserialize(e)) => { + tracing::error!("Corrupt content while downloading tenant: {e}"); + }, + Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => { + tracing::error!("Error while downloading tenant: {e}"); + }, + Ok(()) => {} + }; + + // Irrespective of the result, we will reschedule ourselves to run after our usual period. + + // If the job had a target execution time, we may check our final execution + // time against that for observability purposes. + if let (Some(target_time), Some(period)) = (target_time, period) { + // Only track execution lag if this isn't our first download: otherwise, it is expected + // that execution will have taken longer than our configured interval, for example + // when starting up a pageserver and + if last_download.is_some() { + // Elapsed time includes any scheduling lag as well as the execution of the job + let elapsed = Instant::now().duration_since(target_time); + + warn_when_period_overrun( + elapsed, + period, + BackgroundLoopKind::SecondaryDownload, + ); + } + } + + CompleteDownload { + secondary_state, + completed_at: Instant::now(), + } + }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())))) + } +} + +/// This type is a convenience to group together the various functions involved in +/// freshening a secondary tenant. +struct TenantDownloader<'a> { + conf: &'static PageServerConf, + remote_storage: &'a GenericRemoteStorage, + secondary_state: &'a SecondaryTenant, +} + +/// Errors that may be encountered while updating a tenant +#[derive(thiserror::Error, Debug)] +enum UpdateError { + #[error("No remote data found")] + NoData, + #[error("Insufficient local storage space")] + NoSpace, + #[error("Failed to download")] + DownloadError(DownloadError), + #[error(transparent)] + Deserialize(#[from] serde_json::Error), + #[error("Cancelled")] + Cancelled, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl From for UpdateError { + fn from(value: DownloadError) -> Self { + match &value { + DownloadError::Cancelled => Self::Cancelled, + DownloadError::NotFound => Self::NoData, + _ => Self::DownloadError(value), + } + } +} + +impl From for UpdateError { + fn from(value: std::io::Error) -> Self { + if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) { + UpdateError::NoSpace + } else { + // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue + UpdateError::Other(anyhow::anyhow!(value)) + } + } +} + +impl<'a> TenantDownloader<'a> { + fn new( + conf: &'static PageServerConf, + remote_storage: &'a GenericRemoteStorage, + secondary_state: &'a SecondaryTenant, + ) -> Self { + Self { + conf, + remote_storage, + secondary_state, + } + } + + async fn download(&self) -> Result<(), UpdateError> { + debug_assert_current_span_has_tenant_id(); + + // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure + // cover our access to local storage. + let Ok(_guard) = self.secondary_state.gate.enter() else { + // Shutting down + return Ok(()); + }; + + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + // Download the tenant's heatmap + let heatmap_bytes = tokio::select!( + bytes = self.download_heatmap() => {bytes?}, + _ = self.secondary_state.cancel.cancelled() => return Ok(()) + ); + + let heatmap = serde_json::from_slice::(&heatmap_bytes)?; + + // Save the heatmap: this will be useful on restart, allowing us to reconstruct + // layer metadata without having to re-download it. + let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id); + + let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX); + let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}"); + let heatmap_path_bg = heatmap_path.clone(); + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await + }) + }) + .await + .expect("Blocking task is never aborted") + .maybe_fatal_err(&context_msg)?; + + tracing::debug!("Wrote local heatmap to {}", heatmap_path); + + // Download the layers in the heatmap + for timeline in heatmap.timelines { + if self.secondary_state.cancel.is_cancelled() { + return Ok(()); + } + + let timeline_id = timeline.timeline_id; + self.download_timeline(timeline) + .instrument(tracing::info_span!( + "secondary_download_timeline", + tenant_id=%tenant_shard_id.tenant_id, + shard_id=%tenant_shard_id.shard_slug(), + %timeline_id + )) + .await?; + } + + Ok(()) + } + + async fn download_heatmap(&self) -> Result, UpdateError> { + debug_assert_current_span_has_tenant_id(); + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + // TODO: make download conditional on ETag having changed since last download + // (https://github.com/neondatabase/neon/issues/6199) + tracing::debug!("Downloading heatmap for secondary tenant",); + + let heatmap_path = remote_heatmap_path(tenant_shard_id); + + let heatmap_bytes = backoff::retry( + || async { + let download = self + .remote_storage + .download(&heatmap_path) + .await + .map_err(UpdateError::from)?; + let mut heatmap_bytes = Vec::new(); + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + let _size = tokio::io::copy(&mut body, &mut heatmap_bytes).await?; + Ok(heatmap_bytes) + }, + |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled), + FAILED_DOWNLOAD_WARN_THRESHOLD, + FAILED_REMOTE_OP_RETRIES, + "download heatmap", + backoff::Cancel::new(self.secondary_state.cancel.clone(), || { + UpdateError::Cancelled + }), + ) + .await?; + + SECONDARY_MODE.download_heatmap.inc(); + + Ok(heatmap_bytes) + } + + async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> { + debug_assert_current_span_has_tenant_and_timeline_id(); + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + let timeline_path = self + .conf + .timeline_path(tenant_shard_id, &timeline.timeline_id); + + // Accumulate updates to the state + let mut touched = Vec::new(); + + // Clone a view of what layers already exist on disk + let timeline_state = self + .secondary_state + .detail + .lock() + .unwrap() + .timelines + .get(&timeline.timeline_id) + .cloned(); + + let timeline_state = match timeline_state { + Some(t) => t, + None => { + // We have no existing state: need to scan local disk for layers first. + let timeline_state = + init_timeline_state(self.conf, tenant_shard_id, &timeline).await; + + // Re-acquire detail lock now that we're done with async load from local FS + self.secondary_state + .detail + .lock() + .unwrap() + .timelines + .insert(timeline.timeline_id, timeline_state.clone()); + timeline_state + } + }; + + let layers_in_heatmap = timeline + .layers + .iter() + .map(|l| &l.name) + .collect::>(); + let layers_on_disk = timeline_state + .on_disk_layers + .iter() + .map(|l| l.0) + .collect::>(); + + // Remove on-disk layers that are no longer present in heatmap + for layer in layers_on_disk.difference(&layers_in_heatmap) { + let local_path = timeline_path.join(layer.to_string()); + tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",); + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found) + .maybe_fatal_err("Removing secondary layer")?; + } + + // Download heatmap layers that are not present on local disk, or update their + // access time if they are already present. + for layer in timeline.layers { + if self.secondary_state.cancel.is_cancelled() { + return Ok(()); + } + + // Existing on-disk layers: just update their access time. + if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) { + tracing::debug!("Layer {} is already on disk", layer.name); + if on_disk.metadata != LayerFileMetadata::from(&layer.metadata) + || on_disk.access_time != layer.access_time + { + // We already have this layer on disk. Update its access time. + tracing::debug!( + "Access time updated for layer {}: {} -> {}", + layer.name, + strftime(&on_disk.access_time), + strftime(&layer.access_time) + ); + touched.push(layer); + } + continue; + } else { + tracing::debug!("Layer {} not present on disk yet", layer.name); + } + + // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more + // recently than it was evicted. + if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) { + if &layer.access_time > evicted_at { + tracing::info!( + "Re-downloading evicted layer {}, accessed at {}, evicted at {}", + layer.name, + strftime(&layer.access_time), + strftime(evicted_at) + ); + } else { + tracing::trace!( + "Not re-downloading evicted layer {}, accessed at {}, evicted at {}", + layer.name, + strftime(&layer.access_time), + strftime(evicted_at) + ); + continue; + } + } + + // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally + let downloaded_bytes = match download_layer_file( + self.conf, + self.remote_storage, + *tenant_shard_id, + timeline.timeline_id, + &layer.name, + &LayerFileMetadata::from(&layer.metadata), + &self.secondary_state.cancel, + ) + .await + { + Ok(bytes) => bytes, + Err(e) => { + if let DownloadError::NotFound = e { + // A heatmap might be out of date and refer to a layer that doesn't exist any more. + // This is harmless: continue to download the next layer. It is expected during compaction + // GC. + tracing::debug!( + "Skipped downloading missing layer {}, raced with compaction/gc?", + layer.name + ); + continue; + } else { + return Err(e.into()); + } + } + }; + + if downloaded_bytes != layer.metadata.file_size { + let local_path = timeline_path.join(layer.name.to_string()); + + tracing::warn!( + "Downloaded layer {} with unexpected size {} != {}. Removing download.", + layer.name, + downloaded_bytes, + layer.metadata.file_size + ); + + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found)?; + } + + SECONDARY_MODE.download_layer.inc(); + touched.push(layer) + } + + // Write updates to state to record layers we just downloaded or touched. + { + let mut detail = self.secondary_state.detail.lock().unwrap(); + let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default(); + + tracing::info!("Wrote timeline_detail for {} touched layers", touched.len()); + + for t in touched { + use std::collections::hash_map::Entry; + match timeline_detail.on_disk_layers.entry(t.name.clone()) { + Entry::Occupied(mut v) => { + v.get_mut().access_time = t.access_time; + } + Entry::Vacant(e) => { + e.insert(OnDiskState::new( + self.conf, + tenant_shard_id, + &timeline.timeline_id, + t.name, + LayerFileMetadata::from(&t.metadata), + t.access_time, + )); + } + } + } + } + + Ok(()) + } +} + +/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline +async fn init_timeline_state( + conf: &'static PageServerConf, + tenant_shard_id: &TenantShardId, + heatmap: &HeatMapTimeline, +) -> SecondaryDetailTimeline { + let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id); + let mut detail = SecondaryDetailTimeline::default(); + + let mut dir = match tokio::fs::read_dir(&timeline_path).await { + Ok(d) => d, + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + let context = format!("Creating timeline directory {timeline_path}"); + tracing::info!("{}", context); + tokio::fs::create_dir_all(&timeline_path) + .await + .fatal_err(&context); + + // No entries to report: drop out. + return detail; + } else { + on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}")); + } + } + }; + + // As we iterate through layers found on disk, we will look up their metadata from this map. + // Layers not present in metadata will be discarded. + let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> = + heatmap.layers.iter().map(|l| (&l.name, l)).collect(); + + while let Some(dentry) = dir + .next_entry() + .await + .fatal_err(&format!("Listing {timeline_path}")) + { + let dentry_file_name = dentry.file_name(); + let file_name = dentry_file_name.to_string_lossy(); + let local_meta = dentry.metadata().await.fatal_err(&format!( + "Read metadata on {}", + dentry.path().to_string_lossy() + )); + + // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant. + if file_name == METADATA_FILE_NAME { + continue; + } + + match LayerFileName::from_str(&file_name) { + Ok(name) => { + let remote_meta = heatmap_metadata.get(&name); + match remote_meta { + Some(remote_meta) => { + // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784) + if local_meta.len() != remote_meta.metadata.file_size { + // This should not happen, because we do crashsafe write-then-rename when downloading + // layers, and layers in remote storage are immutable. Remove the local file because + // we cannot trust it. + tracing::warn!( + "Removing local layer {name} with unexpected local size {} != {}", + local_meta.len(), + remote_meta.metadata.file_size + ); + } else { + // We expect the access time to be initialized immediately afterwards, when + // the latest heatmap is applied to the state. + detail.on_disk_layers.insert( + name.clone(), + OnDiskState::new( + conf, + tenant_shard_id, + &heatmap.timeline_id, + name, + LayerFileMetadata::from(&remote_meta.metadata), + remote_meta.access_time, + ), + ); + } + } + None => { + // FIXME: consider some optimization when transitioning from attached to secondary: maybe + // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise + // we will end up deleting any layers which were created+uploaded more recently than the heatmap. + tracing::info!( + "Removing secondary local layer {} because it's absent in heatmap", + name + ); + tokio::fs::remove_file(&dentry.path()) + .await + .or_else(fs_ext::ignore_not_found) + .fatal_err(&format!( + "Removing layer {}", + dentry.path().to_string_lossy() + )); + } + } + } + Err(_) => { + // Ignore it. + tracing::warn!("Unexpected file in timeline directory: {file_name}"); + } + } + } + + detail +} diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index ece2b93ce1..ef01c33e8e 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + pin::Pin, sync::{Arc, Weak}, time::{Duration, Instant}, }; @@ -7,35 +8,86 @@ use std::{ use crate::{ metrics::SECONDARY_MODE, tenant::{ - config::AttachmentMode, mgr::TenantManager, remote_timeline_client::remote_heatmap_path, - secondary::CommandResponse, span::debug_assert_current_span_has_tenant_id, Tenant, + config::AttachmentMode, + mgr::TenantManager, + remote_timeline_client::remote_heatmap_path, + span::debug_assert_current_span_has_tenant_id, + tasks::{warn_when_period_overrun, BackgroundLoopKind}, + Tenant, }, }; +use futures::Future; use md5; use pageserver_api::shard::TenantShardId; +use rand::Rng; use remote_storage::GenericRemoteStorage; -use tokio::task::JoinSet; +use super::{ + scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs}, + CommandRequest, +}; use tokio_util::sync::CancellationToken; -use tracing::instrument; -use utils::{backoff, completion::Barrier}; +use tracing::{info_span, instrument, Instrument}; +use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop}; -use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand}; +use super::{heatmap::HeatMapTenant, UploadCommand}; -/// Period between heatmap uploader walking Tenants to look for work to do. -/// If any tenants have a heatmap upload period lower than this, it will be adjusted -/// downward to match. -const DEFAULT_SCHEDULING_INTERVAL: Duration = Duration::from_millis(60000); -const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_millis(1000); +pub(super) async fn heatmap_uploader_task( + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) { + let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency; + + let generator = HeatmapUploader { + tenant_manager, + remote_storage, + cancel: cancel.clone(), + tenants: HashMap::new(), + }; + let mut scheduler = Scheduler::new(generator, concurrency); + + scheduler + .run(command_queue, background_jobs_can_start, cancel) + .instrument(info_span!("heatmap_uploader")) + .await +} + +/// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event +/// handling loop and mutates it as needed: there are no locks here, because that event loop +/// can hold &mut references to this type throughout. +struct HeatmapUploader { + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + cancel: CancellationToken, + + tenants: HashMap, +} struct WriteInProgress { barrier: Barrier, } +impl RunningJob for WriteInProgress { + fn get_barrier(&self) -> Barrier { + self.barrier.clone() + } +} + struct UploadPending { tenant: Arc, last_digest: Option, + target_time: Option, + period: Option, +} + +impl scheduler::PendingJob for UploadPending { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.tenant.get_tenant_shard_id() + } } struct WriteComplete { @@ -45,6 +97,12 @@ struct WriteComplete { next_upload: Option, } +impl scheduler::Completion for WriteComplete { + fn get_tenant_shard_id(&self) -> &TenantShardId { + &self.tenant_shard_id + } +} + /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember /// when we last did a write. We only populate this after doing at least one /// write for a tenant -- this avoids holding state for tenants that have @@ -68,267 +126,111 @@ struct UploaderTenantState { next_upload: Option, } -/// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event -/// handling loop and mutates it as needed: there are no locks here, because that event loop -/// can hold &mut references to this type throughout. -struct HeatmapUploader { - tenant_manager: Arc, - remote_storage: GenericRemoteStorage, - cancel: CancellationToken, +type Scheduler = TenantBackgroundJobs< + HeatmapUploader, + UploadPending, + WriteInProgress, + WriteComplete, + UploadCommand, +>; - tenants: HashMap, - - /// Tenants with work to do, for which tasks should be spawned as soon as concurrency - /// limits permit it. - tenants_pending: std::collections::VecDeque, - - /// Tenants for which a task in `tasks` has been spawned. - tenants_uploading: HashMap, - - tasks: JoinSet<()>, - - /// Channel for our child tasks to send results to: we use a channel for results rather than - /// just getting task results via JoinSet because we need the channel's recv() "sleep until something - /// is available" semantic, rather than JoinSet::join_next()'s "sleep until next thing is available _or_ I'm empty" - /// behavior. - task_result_tx: tokio::sync::mpsc::UnboundedSender, - task_result_rx: tokio::sync::mpsc::UnboundedReceiver, - - concurrent_uploads: usize, - - scheduling_interval: Duration, -} - -/// The uploader task runs a loop that periodically wakes up and schedules tasks for -/// tenants that require an upload, or handles any commands that have been sent into -/// `command_queue`. No I/O is done in this loop: that all happens in the tasks we -/// spawn. -/// -/// Scheduling iterations are somewhat infrequent. However, each one will enqueue -/// all tenants that require an upload, and in between scheduling iterations we will -/// continue to spawn new tasks for pending tenants, as our concurrency limit permits. -/// -/// While we take a CancellationToken here, it is subordinate to the CancellationTokens -/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise -/// we might block waiting on a Tenant. -pub(super) async fn heatmap_uploader_task( - tenant_manager: Arc, - remote_storage: GenericRemoteStorage, - mut command_queue: tokio::sync::mpsc::Receiver>, - background_jobs_can_start: Barrier, - cancel: CancellationToken, -) -> anyhow::Result<()> { - let concurrent_uploads = tenant_manager.get_conf().heatmap_upload_concurrency; - - let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel(); - - let mut uploader = HeatmapUploader { - tenant_manager, - remote_storage, - cancel: cancel.clone(), - tasks: JoinSet::new(), - tenants: HashMap::new(), - tenants_pending: std::collections::VecDeque::new(), - tenants_uploading: HashMap::new(), - task_result_tx: result_tx, - task_result_rx: result_rx, - concurrent_uploads, - scheduling_interval: DEFAULT_SCHEDULING_INTERVAL, - }; - - tracing::info!("Waiting for background_jobs_can start..."); - background_jobs_can_start.wait().await; - tracing::info!("background_jobs_can is ready, proceeding."); - - while !cancel.is_cancelled() { - // Look for new work: this is relatively expensive because we have to go acquire the lock on - // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones - // require an upload. - uploader.schedule_iteration().await?; - - // Between scheduling iterations, we will: - // - Drain any complete tasks and spawn pending tasks - // - Handle incoming administrative commands - // - Check our cancellation token - let next_scheduling_iteration = Instant::now() - .checked_add(uploader.scheduling_interval) - .unwrap_or_else(|| { - tracing::warn!( - "Scheduling interval invalid ({}s), running immediately!", - uploader.scheduling_interval.as_secs_f64() - ); - Instant::now() - }); - loop { - tokio::select! { - _ = cancel.cancelled() => { - // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation. - tracing::info!("Heatmap uploader joining tasks"); - while let Some(_r) = uploader.tasks.join_next().await {}; - tracing::info!("Heatmap uploader terminating"); - - break; - }, - _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => { - tracing::debug!("heatmap_uploader_task: woke for scheduling interval"); - break;}, - cmd = command_queue.recv() => { - tracing::debug!("heatmap_uploader_task: woke for command queue"); - let cmd = match cmd { - Some(c) =>c, - None => { - // SecondaryController was destroyed, and this has raced with - // our CancellationToken - tracing::info!("Heatmap uploader terminating"); - cancel.cancel(); - break; - } - }; - - let CommandRequest{ - response_tx, - payload - } = cmd; - uploader.handle_command(payload, response_tx); - }, - _ = uploader.process_next_completion() => { - if !cancel.is_cancelled() { - uploader.spawn_pending(); - } - } - } - } - } - - Ok(()) -} - -impl HeatmapUploader { - /// Periodic execution phase: inspect all attached tenants and schedule any work they require. - async fn schedule_iteration(&mut self) -> anyhow::Result<()> { +#[async_trait::async_trait] +impl JobGenerator + for HeatmapUploader +{ + async fn schedule(&mut self) -> SchedulingResult { // Cull any entries in self.tenants whose Arc is gone self.tenants .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some()); - // The priority order of previously scheduled work may be invalidated by current state: drop - // all pending work (it will be re-scheduled if still needed) - self.tenants_pending.clear(); - - // Used a fixed 'now' through the following loop, for efficiency and fairness. let now = Instant::now(); - // While iterating over the potentially-long list of tenants, we will periodically yield - // to avoid blocking executor. - const YIELD_ITERATIONS: usize = 1000; + let mut result = SchedulingResult { + jobs: Vec::new(), + want_interval: None, + }; - // Iterate over tenants looking for work to do. let tenants = self.tenant_manager.get_attached_active_tenant_shards(); - for (i, tenant) in tenants.into_iter().enumerate() { - // Process is shutting down, drop out - if self.cancel.is_cancelled() { - return Ok(()); - } - // Skip tenants that already have a write in flight - if self - .tenants_uploading - .contains_key(tenant.get_tenant_shard_id()) - { - continue; - } + yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| { + let period = match tenant.get_heatmap_period() { + None => { + // Heatmaps are disabled for this tenant + return; + } + Some(period) => { + // If any tenant has asked for uploads more frequent than our scheduling interval, + // reduce it to match so that we can keep up. This is mainly useful in testing, where + // we may set rather short intervals. + result.want_interval = match result.want_interval { + None => Some(period), + Some(existing) => Some(std::cmp::min(period, existing)), + }; - self.maybe_schedule_upload(&now, tenant); + period + } + }; - if i + 1 % YIELD_ITERATIONS == 0 { - tokio::task::yield_now().await; - } - } - - // Spawn tasks for as many of our pending tenants as we can. - self.spawn_pending(); - - Ok(()) - } - - /// - /// Cancellation: this method is cancel-safe. - async fn process_next_completion(&mut self) { - match self.task_result_rx.recv().await { - Some(r) => { - self.on_completion(r); - } - None => { - unreachable!("Result sender is stored on Self"); - } - } - } - - /// The 'maybe' refers to the tenant's state: whether it is configured - /// for heatmap uploads at all, and whether sufficient time has passed - /// since the last upload. - fn maybe_schedule_upload(&mut self, now: &Instant, tenant: Arc) { - match tenant.get_heatmap_period() { - None => { - // Heatmaps are disabled for this tenant + // Stale attachments do not upload anything: if we are in this state, there is probably some + // other attachment in mode Single or Multi running on another pageserver, and we don't + // want to thrash and overwrite their heatmap uploads. + if tenant.get_attach_mode() == AttachmentMode::Stale { return; } - Some(period) => { - // If any tenant has asked for uploads more frequent than our scheduling interval, - // reduce it to match so that we can keep up. This is mainly useful in testing, where - // we may set rather short intervals. - if period < self.scheduling_interval { - self.scheduling_interval = std::cmp::max(period, MIN_SCHEDULING_INTERVAL); - } + + // Create an entry in self.tenants if one doesn't already exist: this will later be updated + // with the completion time in on_completion. + let state = self + .tenants + .entry(*tenant.get_tenant_shard_id()) + .or_insert_with(|| { + let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period); + + UploaderTenantState { + tenant: Arc::downgrade(&tenant), + last_upload: None, + next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)), + last_digest: None, + } + }); + + // Decline to do the upload if insufficient time has passed + if state.next_upload.map(|nu| nu > now).unwrap_or(false) { + return; } - } - // Stale attachments do not upload anything: if we are in this state, there is probably some - // other attachment in mode Single or Multi running on another pageserver, and we don't - // want to thrash and overwrite their heatmap uploads. - if tenant.get_attach_mode() == AttachmentMode::Stale { - return; - } - - // Create an entry in self.tenants if one doesn't already exist: this will later be updated - // with the completion time in on_completion. - let state = self - .tenants - .entry(*tenant.get_tenant_shard_id()) - .or_insert_with(|| UploaderTenantState { - tenant: Arc::downgrade(&tenant), - last_upload: None, - next_upload: Some(Instant::now()), - last_digest: None, + let last_digest = state.last_digest; + result.jobs.push(UploadPending { + tenant, + last_digest, + target_time: state.next_upload, + period: Some(period), }); + }) + .await + .ok(); - // Decline to do the upload if insufficient time has passed - if state.next_upload.map(|nu| &nu > now).unwrap_or(false) { - return; - } + result + } - let last_digest = state.last_digest; - self.tenants_pending.push_back(UploadPending { + fn spawn( + &mut self, + job: UploadPending, + ) -> ( + WriteInProgress, + Pin + Send>>, + ) { + let UploadPending { tenant, last_digest, - }) - } + target_time, + period, + } = job; - fn spawn_pending(&mut self) { - while !self.tenants_pending.is_empty() - && self.tenants_uploading.len() < self.concurrent_uploads - { - // unwrap: loop condition includes !is_empty() - let pending = self.tenants_pending.pop_front().unwrap(); - self.spawn_upload(pending.tenant, pending.last_digest); - } - } - - fn spawn_upload(&mut self, tenant: Arc, last_digest: Option) { let remote_storage = self.remote_storage.clone(); - let tenant_shard_id = *tenant.get_tenant_shard_id(); let (completion, barrier) = utils::completion::channel(); - let result_tx = self.task_result_tx.clone(); - self.tasks.spawn(async move { + let tenant_shard_id = *tenant.get_tenant_shard_id(); + (WriteInProgress { barrier }, Box::pin(async move { // Guard for the barrier in [`WriteInProgress`] let _completion = completion; @@ -362,22 +264,47 @@ impl HeatmapUploader { }; let now = Instant::now(); + + // If the job had a target execution time, we may check our final execution + // time against that for observability purposes. + if let (Some(target_time), Some(period)) = (target_time, period) { + // Elapsed time includes any scheduling lag as well as the execution of the job + let elapsed = now.duration_since(target_time); + + warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload); + } + let next_upload = tenant .get_heatmap_period() .and_then(|period| now.checked_add(period)); - result_tx - .send(WriteComplete { + WriteComplete { tenant_shard_id: *tenant.get_tenant_shard_id(), completed_at: now, digest, next_upload, - }) - .ok(); - }); + } + }.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())))) + } - self.tenants_uploading - .insert(tenant_shard_id, WriteInProgress { barrier }); + fn on_command(&mut self, command: UploadCommand) -> anyhow::Result { + let tenant_shard_id = command.get_tenant_shard_id(); + + tracing::info!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Starting heatmap write on command"); + let tenant = self + .tenant_manager + .get_attached_tenant_shard(*tenant_shard_id, true) + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(UploadPending { + // Ignore our state for last digest: this forces an upload even if nothing has changed + last_digest: None, + tenant, + target_time: None, + period: None, + }) } #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))] @@ -389,7 +316,6 @@ impl HeatmapUploader { digest, next_upload, } = completion; - self.tenants_uploading.remove(&tenant_shard_id); use std::collections::hash_map::Entry; match self.tenants.entry(tenant_shard_id) { Entry::Vacant(_) => { @@ -402,69 +328,6 @@ impl HeatmapUploader { } } } - - fn handle_command( - &mut self, - command: UploadCommand, - response_tx: tokio::sync::oneshot::Sender, - ) { - match command { - UploadCommand::Upload(tenant_shard_id) => { - // If an upload was ongoing for this tenant, let it finish first. - let barrier = if let Some(writing_state) = - self.tenants_uploading.get(&tenant_shard_id) - { - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Waiting for heatmap write to complete"); - writing_state.barrier.clone() - } else { - // Spawn the upload then immediately wait for it. This will block processing of other commands and - // starting of other background work. - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Starting heatmap write on command"); - let tenant = match self - .tenant_manager - .get_attached_tenant_shard(tenant_shard_id, true) - { - Ok(t) => t, - Err(e) => { - // Drop result of send: we don't care if caller dropped their receiver - drop(response_tx.send(CommandResponse { - result: Err(e.into()), - })); - return; - } - }; - self.spawn_upload(tenant, None); - let writing_state = self - .tenants_uploading - .get(&tenant_shard_id) - .expect("We just inserted this"); - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Waiting for heatmap upload to complete"); - - writing_state.barrier.clone() - }; - - // This task does no I/O: it only listens for a barrier's completion and then - // sends to the command response channel. It is therefore safe to spawn this without - // any gates/task_mgr hooks. - tokio::task::spawn(async move { - barrier.wait().await; - - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Heatmap upload complete"); - - // Drop result of send: we don't care if caller dropped their receiver - drop(response_tx.send(CommandResponse { result: Ok(()) })) - }); - } - } - } } enum UploadHeatmapOutcome { @@ -487,7 +350,6 @@ enum UploadHeatmapError { /// The inner upload operation. This will skip if `last_digest` is Some and matches the digest /// of the object we would have uploaded. -#[instrument(skip_all, fields(tenant_id = %tenant.get_tenant_shard_id().tenant_id, shard_id = %tenant.get_tenant_shard_id().shard_slug()))] async fn upload_tenant_heatmap( remote_storage: GenericRemoteStorage, tenant: &Arc, diff --git a/pageserver/src/tenant/secondary/scheduler.rs b/pageserver/src/tenant/secondary/scheduler.rs new file mode 100644 index 0000000000..cf01a100d9 --- /dev/null +++ b/pageserver/src/tenant/secondary/scheduler.rs @@ -0,0 +1,361 @@ +use async_trait; +use futures::Future; +use std::{ + collections::HashMap, + marker::PhantomData, + pin::Pin, + time::{Duration, Instant}, +}; + +use pageserver_api::shard::TenantShardId; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use utils::{completion::Barrier, yielding_loop::yielding_loop}; + +use super::{CommandRequest, CommandResponse}; + +/// Scheduling interval is the time between calls to JobGenerator::schedule. +/// When we schedule jobs, the job generator may provide a hint of its preferred +/// interval, which we will respect within these intervals. +const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10); +const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1); + +/// Scheduling helper for background work across many tenants. +/// +/// Systems that need to run background work across many tenants may use this type +/// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`] +/// implementation to provide the work to execute. This is a simple scheduler that just +/// polls the generator for outstanding work, replacing its queue of pending work with +/// what the generator yields on each call: the job generator can change its mind about +/// the order of jobs between calls. The job generator is notified when jobs complete, +/// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement +/// admin APIs). +/// +/// For an example see [`crate::tenant::secondary::heatmap_uploader`] +/// +/// G: A JobGenerator that this scheduler will poll to find pending jobs +/// PJ: 'Pending Job': type for job descriptors that are ready to run +/// RJ: 'Running Job' type' for jobs that have been spawned +/// C : 'Completion' type that spawned jobs will send when they finish +/// CMD: 'Command' type that the job generator will accept to create jobs on-demand +pub(super) struct TenantBackgroundJobs +where + G: JobGenerator, + C: Completion, + PJ: PendingJob, + RJ: RunningJob, +{ + generator: G, + + /// Ready to run. Will progress to `running` once concurrent limit is satisfied, or + /// be removed on next scheduling pass. + pending: std::collections::VecDeque, + + /// Tasks currently running in Self::tasks for these tenants. Check this map + /// before pushing more work into pending for the same tenant. + running: HashMap, + + tasks: JoinSet, + + concurrency: usize, + + /// How often we would like schedule_interval to be called. + pub(super) scheduling_interval: Duration, + + _phantom: PhantomData<(PJ, RJ, C, CMD)>, +} + +#[async_trait::async_trait] +pub(crate) trait JobGenerator +where + C: Completion, + PJ: PendingJob, + RJ: RunningJob, +{ + /// Called at each scheduling interval. Return a list of jobs to run, most urgent first. + /// + /// This function may be expensive (e.g. walk all tenants), but should not do any I/O. + /// Implementations should take care to yield the executor periodically if running + /// very long loops. + /// + /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending + /// jobs is not drained by the next scheduling interval, pending jobs will be cleared + /// and re-generated. + async fn schedule(&mut self) -> SchedulingResult; + + /// Called when a pending job is ready to be run. + /// + /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it. + fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin + Send>>); + + /// Called when a job previously spawned with spawn() transmits its completion + fn on_completion(&mut self, completion: C); + + /// Called when a command is received. A job will be spawned immediately if the return + /// value is Some, ignoring concurrency limits and the pending queue. + fn on_command(&mut self, cmd: CMD) -> anyhow::Result; +} + +/// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling +pub(super) struct SchedulingResult { + pub(super) jobs: Vec, + /// The job generator would like to be called again this soon + pub(super) want_interval: Option, +} + +/// See [`TenantBackgroundJobs`]. +pub(super) trait PendingJob { + fn get_tenant_shard_id(&self) -> &TenantShardId; +} + +/// See [`TenantBackgroundJobs`]. +pub(super) trait Completion: Send + 'static { + fn get_tenant_shard_id(&self) -> &TenantShardId; +} + +/// See [`TenantBackgroundJobs`]. +pub(super) trait RunningJob { + fn get_barrier(&self) -> Barrier; +} + +impl TenantBackgroundJobs +where + C: Completion, + PJ: PendingJob, + RJ: RunningJob, + G: JobGenerator, +{ + pub(super) fn new(generator: G, concurrency: usize) -> Self { + Self { + generator, + pending: std::collections::VecDeque::new(), + running: HashMap::new(), + tasks: JoinSet::new(), + concurrency, + scheduling_interval: MAX_SCHEDULING_INTERVAL, + _phantom: PhantomData, + } + } + + pub(super) async fn run( + &mut self, + mut command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, + ) { + tracing::info!("Waiting for background_jobs_can start..."); + background_jobs_can_start.wait().await; + tracing::info!("background_jobs_can is ready, proceeding."); + + while !cancel.is_cancelled() { + // Look for new work: this is relatively expensive because we have to go acquire the lock on + // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones + // require an upload. + self.schedule_iteration(&cancel).await; + + if cancel.is_cancelled() { + return; + } + + // Schedule some work, if concurrency limit permits it + self.spawn_pending(); + + // Between scheduling iterations, we will: + // - Drain any complete tasks and spawn pending tasks + // - Handle incoming administrative commands + // - Check our cancellation token + let next_scheduling_iteration = Instant::now() + .checked_add(self.scheduling_interval) + .unwrap_or_else(|| { + tracing::warn!( + "Scheduling interval invalid ({}s)", + self.scheduling_interval.as_secs_f64() + ); + // unwrap(): this constant is small, cannot fail to add to time unless + // we are close to the end of the universe. + Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap() + }); + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("joining tasks"); + // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation. + // It is the callers responsibility to make sure that the tasks they scheduled + // respect an appropriate cancellation token, to shut down promptly. It is only + // safe to wait on joining these tasks because we can see the cancellation token + // has been set. + while let Some(_r) = self.tasks.join_next().await {} + tracing::info!("terminating on cancellation token."); + + break; + }, + _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => { + tracing::debug!("woke for scheduling interval"); + break;}, + cmd = command_queue.recv() => { + tracing::debug!("woke for command queue"); + let cmd = match cmd { + Some(c) =>c, + None => { + // SecondaryController was destroyed, and this has raced with + // our CancellationToken + tracing::info!("terminating on command queue destruction"); + cancel.cancel(); + break; + } + }; + + let CommandRequest{ + response_tx, + payload + } = cmd; + self.handle_command(payload, response_tx); + }, + _ = async { + let completion = self.process_next_completion().await; + match completion { + Some(c) => { + self.generator.on_completion(c); + if !cancel.is_cancelled() { + self.spawn_pending(); + } + }, + None => { + // Nothing is running, so just wait: expect that this future + // will be dropped when something in the outer select! fires. + cancel.cancelled().await; + } + } + + } => {} + } + } + } + } + + fn do_spawn(&mut self, job: PJ) { + let tenant_shard_id = *job.get_tenant_shard_id(); + let (in_progress, fut) = self.generator.spawn(job); + + self.tasks.spawn(fut); + + self.running.insert(tenant_shard_id, in_progress); + } + + /// For all pending tenants that are elegible for execution, spawn their task. + /// + /// Caller provides the spawn operation, we track the resulting execution. + fn spawn_pending(&mut self) { + while !self.pending.is_empty() && self.running.len() < self.concurrency { + // unwrap: loop condition includes !is_empty() + let pending = self.pending.pop_front().unwrap(); + self.do_spawn(pending); + } + } + + /// For administrative commands: skip the pending queue, ignore concurrency limits + fn spawn_now(&mut self, job: PJ) -> &RJ { + let tenant_shard_id = *job.get_tenant_shard_id(); + self.do_spawn(job); + self.running + .get(&tenant_shard_id) + .expect("We just inserted this") + } + + /// Wait until the next task completes, and handle its completion + /// + /// Cancellation: this method is cancel-safe. + async fn process_next_completion(&mut self) -> Option { + match self.tasks.join_next().await { + Some(r) => { + // We use a channel to drive completions, but also + // need to drain the JoinSet to avoid completed tasks + // accumulating. These calls are 1:1 because every task + // we spawn into this joinset submits is result to the channel. + let completion = r.expect("Panic in background task"); + + self.running.remove(completion.get_tenant_shard_id()); + Some(completion) + } + None => { + // Nothing is running, so we have nothing to wait for. We may drop out: the + // main even loop will call us again after the next time it has run something. + None + } + } + } + + /// Convert the command into a pending job, spawn it, and when the spawned + /// job completes, send the result down `response_tx`. + fn handle_command( + &mut self, + cmd: CMD, + response_tx: tokio::sync::oneshot::Sender, + ) { + let job = match self.generator.on_command(cmd) { + Ok(j) => j, + Err(e) => { + response_tx.send(CommandResponse { result: Err(e) }).ok(); + return; + } + }; + + let tenant_shard_id = job.get_tenant_shard_id(); + let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) { + barrier + } else { + let running = self.spawn_now(job); + running.get_barrier().clone() + }; + + // This task does no I/O: it only listens for a barrier's completion and then + // sends to the command response channel. It is therefore safe to spawn this without + // any gates/task_mgr hooks. + tokio::task::spawn(async move { + barrier.wait().await; + + response_tx.send(CommandResponse { result: Ok(()) }).ok(); + }); + } + + fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option { + self.running.get(tenant_shard_id).map(|r| r.get_barrier()) + } + + /// Periodic execution phase: inspect all attached tenants and schedule any work they require. + /// + /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`] + /// + /// This function resets the pending list: it is assumed that the caller may change their mind about + /// which tenants need work between calls to schedule_iteration. + async fn schedule_iteration(&mut self, cancel: &CancellationToken) { + let SchedulingResult { + jobs, + want_interval, + } = self.generator.schedule().await; + + // Adjust interval based on feedback from the job generator + if let Some(want_interval) = want_interval { + // Calculation uses second granularity: this scheduler is not intended for high frequency tasks + self.scheduling_interval = Duration::from_secs(std::cmp::min( + std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()), + MAX_SCHEDULING_INTERVAL.as_secs(), + )); + } + + // The priority order of previously scheduled work may be invalidated by current state: drop + // all pending work (it will be re-scheduled if still needed) + self.pending.clear(); + + // While iterating over the potentially-long list of tenants, we will periodically yield + // to avoid blocking executor. + yielding_loop(1000, cancel, jobs.into_iter(), |job| { + // Skip tenants that already have a write in flight + if !self.running.contains_key(job.get_tenant_shard_id()) { + self.pending.push_back(job); + } + }) + .await + .ok(); + } +} diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 7ff1873eda..aa5894cc37 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -45,6 +45,8 @@ pub(crate) enum BackgroundLoopKind { ConsumptionMetricsCollectMetrics, ConsumptionMetricsSyntheticSizeWorker, InitialLogicalSizeCalculation, + HeatmapUpload, + SecondaryDownload, } impl BackgroundLoopKind { diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 6dea0d923d..ccacc0a987 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -326,6 +326,10 @@ class PageserverHttpClient(requests.Session): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload") self.verbose_error(res) + def tenant_secondary_download(self, tenant_id: TenantId): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download") + self.verbose_error(res) + def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]): assert "tenant_id" not in config.keys() res = self.put( diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index 8ae4297983..a9eff99a0c 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -1,9 +1,11 @@ import random +from pathlib import Path from typing import Any, Dict, Optional import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver +from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, S3Scrubber +from fixtures.pageserver.utils import assert_prefix_empty, tenant_delete_wait_completed from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until @@ -251,6 +253,9 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder): flush_ms=5000, ) + # Encourage the new location to download while still in secondary mode + pageserver_b.http_client().tenant_secondary_download(tenant_id) + migrated_generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver_b.id) log.info(f"Acquired generation {migrated_generation} for destination pageserver") assert migrated_generation == initial_generation + 1 @@ -258,8 +263,6 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder): # Writes and reads still work in AttachedStale. workload.validate(pageserver_a.id) - # TODO: call into secondary mode API hooks to do an upload/download sync - # Generate some more dirty writes: we expect the origin to ingest WAL in # in AttachedStale workload.churn_rows(64, pageserver_a.id, upload=False) @@ -369,3 +372,143 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder): log.info(f"Read back heatmap: {heatmap_second}") assert heatmap_second != heatmap_first validate_heatmap(heatmap_second) + + +def list_layers(pageserver, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]: + """ + Inspect local storage on a pageserver to discover which layer files are present. + + :return: list of relative paths to layers, from the timeline root. + """ + timeline_path = pageserver.timeline_dir(tenant_id, timeline_id) + + def relative(p: Path) -> Path: + return p.relative_to(timeline_path) + + return sorted( + list( + map( + relative, + filter( + lambda path: path.name != "metadata" + and "ephemeral" not in path.name + and "temp" not in path.name, + timeline_path.glob("*"), + ), + ) + ) + ) + + +def test_secondary_downloads(neon_env_builder: NeonEnvBuilder): + """ + Test the overall data flow in secondary mode: + - Heatmap uploads from the attached location + - Heatmap & layer downloads from the secondary location + - Eviction of layers on the attached location results in deletion + on the secondary location as well. + """ + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + assert env.attachment_service is not None + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + ps_attached = env.pageservers[0] + ps_secondary = env.pageservers[1] + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageservers[0].id) + workload.write_rows(256, ps_attached.id) + + # Configure a secondary location + log.info("Setting up secondary location...") + ps_secondary.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + readback_conf = ps_secondary.read_tenant_location_conf(tenant_id) + log.info(f"Read back conf: {readback_conf}") + + # Explicit upload/download cycle + # ============================== + log.info("Synchronizing after initial write...") + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + + ps_secondary.http_client().tenant_secondary_download(tenant_id) + + assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers( + ps_secondary, tenant_id, timeline_id + ) + + # Make changes on attached pageserver, check secondary downloads them + # =================================================================== + log.info("Synchronizing after subsequent write...") + workload.churn_rows(128, ps_attached.id) + + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + ps_secondary.http_client().tenant_secondary_download(tenant_id) + + assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers( + ps_secondary, tenant_id, timeline_id + ) + + # FIXME: this sleep is needed to avoid on-demand promotion of the layers we evict, while + # walreceiver is still doing something. + import time + + time.sleep(5) + + # Do evictions on attached pageserver, check secondary follows along + # ================================================================== + log.info("Evicting a layer...") + layer_to_evict = list_layers(ps_attached, tenant_id, timeline_id)[0] + ps_attached.http_client().evict_layer(tenant_id, timeline_id, layer_name=layer_to_evict.name) + + log.info("Synchronizing after eviction...") + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + ps_secondary.http_client().tenant_secondary_download(tenant_id) + + assert layer_to_evict not in list_layers(ps_attached, tenant_id, timeline_id) + assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers( + ps_secondary, tenant_id, timeline_id + ) + + # Scrub the remote storage + # ======================== + # This confirms that the scrubber isn't upset by the presence of the heatmap + S3Scrubber(neon_env_builder.test_output_dir, neon_env_builder).scan_metadata() + + # Detach secondary and delete tenant + # =================================== + # This confirms that the heatmap gets cleaned up as well as other normal content. + log.info("Detaching secondary location...") + ps_secondary.tenant_location_configure( + tenant_id, + { + "mode": "Detached", + "secondary_conf": None, + "tenant_conf": {}, + }, + ) + + log.info("Deleting tenant...") + tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10) + + assert_prefix_empty( + neon_env_builder, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + )