From 69f927a9becf587085ac6dfc131671b08c109040 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 11 Oct 2023 15:22:22 +0100 Subject: [PATCH] pageserver: TenantManager support for SecondaryTenant --- pageserver/client/src/mgmt_api.rs | 14 +- pageserver/src/tenant/delete.rs | 2 +- pageserver/src/tenant/mgr.rs | 225 ++++++++--- .../src/tenant/secondary/heatmap_writer.rs | 380 ------------------ 4 files changed, 182 insertions(+), 439 deletions(-) delete mode 100644 pageserver/src/tenant/secondary/heatmap_writer.rs diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 0ad4e1551e..90a8b51966 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,4 +1,4 @@ -use pageserver_api::models::*; +use pageserver_api::{models::*, shard::TenantShardId}; use reqwest::{IntoUrl, Method}; use utils::{ http::error::HttpErrorBody, @@ -162,6 +162,18 @@ impl Client { Ok(()) } + pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> { + let uri = format!( + "{}/v1/tenant/{}/secondary/download", + self.mgmt_api_endpoint, tenant_id + ); + self.request(Method::POST, &uri, ()) + .await? + .error_for_status() + .map(|_| ()) + .map_err(|e| Error::ApiError(format!("{}", e))) + } + pub async fn location_config( &self, tenant_id: TenantId, diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index e8491f26db..c9587c34a3 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -585,7 +585,7 @@ impl DeleteTenantFlow { } break; } - TenantsMapRemoveResult::Occupied(TenantSlot::Secondary) => { + TenantsMapRemoveResult::Occupied(TenantSlot::Secondary(_)) => { // This is unexpected: this secondary tenants should not have been created, and we // are not in a position to shut it down from here. tracing::warn!("Tenant transitioned to secondary mode while deleting!"); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b2f14db9f7..db91c55526 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -44,6 +44,8 @@ use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use super::delete::DeleteTenantError; +use super::secondary::SecondaryTenant; +use super::storage_layer::Layer; use super::TenantSharedResources; /// For a tenant that appears in TenantsMap, it may either be @@ -57,7 +59,7 @@ use super::TenantSharedResources; /// having a properly acquired generation (Secondary doesn't need a generation) pub(crate) enum TenantSlot { Attached(Arc), - Secondary, + Secondary(Arc), /// In this state, other administrative operations acting on the TenantId should /// block, or return a retry indicator equivalent to HTTP 503. InProgress(utils::completion::Barrier), @@ -67,7 +69,7 @@ impl std::fmt::Debug for TenantSlot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()), - Self::Secondary => write!(f, "Secondary"), + Self::Secondary(_) => write!(f, "Secondary"), Self::InProgress(_) => write!(f, "InProgress"), } } @@ -78,7 +80,7 @@ impl TenantSlot { fn get_attached(&self) -> Option<&Arc> { match self { Self::Attached(t) => Some(t), - Self::Secondary => None, + Self::Secondary(_) => None, Self::InProgress(_) => None, } } @@ -469,7 +471,10 @@ pub async fn init_tenant_mgr( // tenants, because they do no remote writes and hence require no // generation number info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode"); - tenants.insert(tenant_shard_id, TenantSlot::Secondary); + tenants.insert( + tenant_shard_id, + TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id)), + ); } LocationMode::Attached(_) => { // TODO: augment re-attach API to enable the control plane to @@ -664,8 +669,14 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { total_attached += 1; } - TenantSlot::Secondary => { - shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary); + TenantSlot::Secondary(state) => { + // We don't need to wait for this individually per-tenant: the + // downloader task will be waited on eventually, this cancel + // is just to encourage it to drop out if it is doing work + // for this tenant right now. + state.cancel.cancel(); + + shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state)); } TenantSlot::InProgress(notify) => { // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will @@ -848,12 +859,28 @@ impl TenantManager { Some(TenantSlot::InProgress(_)) => { Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) } - None | Some(TenantSlot::Secondary) => { + None | Some(TenantSlot::Secondary(_)) => { Err(GetTenantError::NotFound(tenant_shard_id.tenant_id)) } } } + pub(crate) fn get_secondary_tenant_shard( + &self, + tenant_shard_id: TenantShardId, + ) -> Option> { + let locked = self.tenants.read().unwrap(); + + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) + .ok() + .flatten(); + + match peek_slot { + Some(TenantSlot::Secondary(s)) => Some(s.clone()), + _ => None, + } + } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, @@ -932,42 +959,57 @@ impl TenantManager { // not do significant I/O, and shutdowns should be prompt via cancellation tokens. let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; - if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() { - // The case where we keep a Tenant alive was covered above in the special case - // for Attached->Attached transitions in the same generation. By this point, - // if we see an attached tenant we know it will be discarded and should be - // shut down. - let (_guard, progress) = utils::completion::channel(); + match slot_guard.get_old_value() { + Some(TenantSlot::Attached(tenant)) => { + // The case where we keep a Tenant alive was covered above in the special case + // for Attached->Attached transitions in the same generation. By this point, + // if we see an attached tenant we know it will be discarded and should be + // shut down. + let (_guard, progress) = utils::completion::channel(); - match tenant.get_attach_mode() { - AttachmentMode::Single | AttachmentMode::Multi => { - // Before we leave our state as the presumed holder of the latest generation, - // flush any outstanding deletions to reduce the risk of leaking objects. - self.resources.deletion_queue_client.flush_advisory() - } - AttachmentMode::Stale => { - // If we're stale there's not point trying to flush deletions - } - }; + match tenant.get_attach_mode() { + AttachmentMode::Single | AttachmentMode::Multi => { + // Before we leave our state as the presumed holder of the latest generation, + // flush any outstanding deletions to reduce the risk of leaking objects. + self.resources.deletion_queue_client.flush_advisory() + } + AttachmentMode::Stale => { + // If we're stale there's not point trying to flush deletions + } + }; - info!("Shutting down attached tenant"); - match tenant.shutdown(progress, false).await { - Ok(()) => {} - Err(barrier) => { - info!("Shutdown already in progress, waiting for it to complete"); - barrier.wait().await; + info!("Shutting down attached tenant"); + match tenant.shutdown(progress, false).await { + Ok(()) => {} + Err(barrier) => { + info!("Shutdown already in progress, waiting for it to complete"); + barrier.wait().await; + } } + slot_guard.drop_old_value().expect("We just shut it down"); + } + Some(TenantSlot::Secondary(state)) => { + info!("Shutting down secondary tenant"); + state.shutdown().await; + } + Some(TenantSlot::InProgress(_)) => { + // This should never happen: acquire_slot should error out + // if the contents of a slot were InProgress. + anyhow::bail!("Acquired an InProgress slot, this is a bug.") + } + None => { + // Slot was vacant, nothing needs shutting down. } - slot_guard.drop_old_value().expect("We just shut it down"); } let tenant_path = self.conf.tenant_path(&tenant_shard_id); + let timelines_path = self.conf.timelines_path(&tenant_shard_id); let new_slot = match &new_location_config.mode { LocationMode::Secondary(_) => { // Directory doesn't need to be fsync'd because if we crash it can // safely be recreated next time this tenant location is configured. - tokio::fs::create_dir_all(&tenant_path) + tokio::fs::create_dir_all(&timelines_path) .await .with_context(|| format!("Creating {tenant_path}"))?; @@ -975,11 +1017,9 @@ impl TenantManager { .await .map_err(SetNewTenantConfigError::Persist)?; - TenantSlot::Secondary + TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id)) } LocationMode::Attached(_attach_config) => { - let timelines_path = self.conf.timelines_path(&tenant_shard_id); - // Directory doesn't need to be fsync'd because we do not depend on // it to exist after crashes: it may be recreated when tenant is // re-attached, see https://github.com/neondatabase/neon/issues/5550 @@ -1102,6 +1142,80 @@ impl TenantManager { .collect(), } } + // Do some synchronous work for all tenant slots in Secondary state. The provided + // callback should be small and fast, as it will be called inside the global + // TenantsMap lock. + pub(crate) fn foreach_secondary_tenants(&self, mut func: F) + where + // TODO: let the callback return a hint to drop out of the loop early + F: FnMut(&TenantShardId, &Arc), + { + let locked = self.tenants.read().unwrap(); + + let map = match &*locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return, + TenantsMap::Open(m) => m, + }; + + for (tenant_id, slot) in map { + if let TenantSlot::Secondary(state) = slot { + // Only expose secondary tenants that are not currently shutting down + if !state.cancel.is_cancelled() { + func(tenant_id, state) + } + } + } + } + + /// Having planned some evictions for a tenant, attempt to execute them. + /// + /// Execution will not occur if the TenantSlot for this tenant is not in + /// a state suitable to execute. + // TODO: is Layer really needed here? Maybe we should have reduced to a LayerFileName by this point. + pub(crate) async fn evict_tenant_layers( + &self, + tenant_shard_id: &TenantShardId, + timeline_layers: Vec<(TimelineId, Layer)>, + ) { + // TODO: unify with how we evict for attached tenants. They should also + // pass through here, to avoid attached tenant evictions racing with + // the lifetime of secondary locations for the same tenant ID. + + let state = { + let locked = self.tenants.read().unwrap(); + let map = match &*locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return, + TenantsMap::Open(m) => m, + }; + + match map.get(tenant_shard_id) { + Some(TenantSlot::Secondary(secondary_state)) => { + // Found a secondary as expected + secondary_state.clone() + } + _ => { + // A location configuration change raced with this eviction + tracing::info!( + "Dropping {} layer evictions, tenant not in suitable state", + timeline_layers.len() + ); + return; + } + } + }; + + // Concurrency: downloads might have been going on while we deleted layers. However, + // we are only deleting layers that the SecondaryTenant already thought were on disk, + // so we won't be deleting anything that it is _currently_ downloading. All deletions + // of SecondaryTenant layers flow through this function, so there is no risk that the + // layer we're evicting is no longer present in-memory. + state + .evict_layers(self.conf, tenant_shard_id, timeline_layers) + .instrument(tracing::info_span!("evict_layers", + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug() + )) + .await; + } } #[derive(Debug, thiserror::Error)] @@ -1151,7 +1265,7 @@ pub(crate) fn get_tenant( Some(TenantSlot::InProgress(_)) => { Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) } - None | Some(TenantSlot::Secondary) => { + None | Some(TenantSlot::Secondary(_)) => { Err(GetTenantError::NotFound(tenant_shard_id.tenant_id)) } } @@ -1222,7 +1336,7 @@ pub(crate) async fn get_active_tenant_with_timeout( } } } - Some(TenantSlot::Secondary) => { + Some(TenantSlot::Secondary(_)) => { return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( tenant_id, ))) @@ -1521,7 +1635,7 @@ pub(crate) async fn list_tenants() -> Result, Ok(m.iter() .filter_map(|(id, tenant)| match tenant { TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), - TenantSlot::Secondary => None, + TenantSlot::Secondary(_) => None, TenantSlot::InProgress(_) => None, }) .collect()) @@ -1778,11 +1892,7 @@ impl SlotGuard { fn old_value_is_shutdown(&self) -> bool { match self.old_value.as_ref() { Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(), - Some(TenantSlot::Secondary) => { - // TODO: when adding secondary mode tenants, this will check for shutdown - // in the same way that we do for `Tenant` above - true - } + Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(), Some(TenantSlot::InProgress(_)) => { // A SlotGuard cannot be constructed for a slot that was already InProgress unreachable!() @@ -1992,26 +2102,19 @@ where let mut slot_guard = tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?; - // The SlotGuard allows us to manipulate the Tenant object without fear of some - // concurrent API request doing something else for the same tenant ID. - let attached_tenant = match slot_guard.get_old_value() { - Some(TenantSlot::Attached(t)) => Some(t), - _ => None, - }; - // allow pageserver shutdown to await for our completion let (_guard, progress) = completion::channel(); - // If the tenant was attached, shut it down gracefully. For secondary - // locations this part is not necessary - match &attached_tenant { - Some(attached_tenant) => { + // The SlotGuard allows us to manipulate the Tenant object without fear of some + // concurrent API request doing something else for the same tenant ID. + let attached_tenant = match slot_guard.get_old_value() { + Some(TenantSlot::Attached(tenant)) => { // whenever we remove a tenant from memory, we don't want to flush and wait for upload let freeze_and_flush = false; // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so // that we can continue safely to cleanup. - match attached_tenant.shutdown(progress, freeze_and_flush).await { + match tenant.shutdown(progress, freeze_and_flush).await { Ok(()) => {} Err(_other) => { // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to @@ -2020,11 +2123,19 @@ where return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id)); } } + Some(tenant) } - None => { - // Nothing to wait on when not attached, proceed. + Some(TenantSlot::Secondary(secondary_state)) => { + tracing::info!("Shutting down in secondary mode"); + secondary_state.shutdown().await; + None } - } + Some(TenantSlot::InProgress(_)) => { + // Acquiring a slot guarantees its old value was not InProgress + unreachable!(); + } + None => None, + }; match tenant_cleanup .await diff --git a/pageserver/src/tenant/secondary/heatmap_writer.rs b/pageserver/src/tenant/secondary/heatmap_writer.rs deleted file mode 100644 index 1b12844504..0000000000 --- a/pageserver/src/tenant/secondary/heatmap_writer.rs +++ /dev/null @@ -1,380 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, Weak}, - time::{Duration, Instant}, -}; - -use crate::{ - metrics::SECONDARY_MODE, - tenant::{ - mgr::{self, TenantManager}, - remote_timeline_client::remote_heatmap_path, - secondary::CommandResponse, - Tenant, - }, -}; - -use pageserver_api::models::TenantState; -use remote_storage::GenericRemoteStorage; - -use tokio::task::JoinSet; -use tokio_util::sync::CancellationToken; -use tracing::Instrument; -use utils::{backoff, completion::Barrier, id::TenantId}; - -use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand}; - -/// Period between heatmap writer walking Tenants to look for work to do -const HEATMAP_WAKE_INTERVAL: Duration = Duration::from_millis(1000); - -/// Periodic between heatmap writes for each Tenant -const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000); - -/// While we take a CancellationToken here, it is subordinate to the CancellationTokens -/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise -/// we might block waiting on a Tenant. -pub(super) async fn heatmap_writer_task( - tenant_manager: Arc, - remote_storage: GenericRemoteStorage, - mut command_queue: tokio::sync::mpsc::Receiver>, - background_jobs_can_start: Barrier, - cancel: CancellationToken, -) -> anyhow::Result<()> { - let mut writer = HeatmapWriter { - tenant_manager, - remote_storage, - cancel: cancel.clone(), - tasks: JoinSet::new(), - tenants: HashMap::new(), - tenants_writing: HashMap::new(), - concurrent_writes: 8, - }; - - tracing::info!("Waiting for background_jobs_can start..."); - background_jobs_can_start.wait().await; - tracing::info!("background_jobs_can is ready, proceeding."); - - while !cancel.is_cancelled() { - writer.iteration().await?; - - tokio::select! { - _ = cancel.cancelled() => { - tracing::info!("Heatmap writer joining tasks"); - - tracing::info!("Heatmap writer terminating"); - - break; - }, - _ = tokio::time::sleep(HEATMAP_WAKE_INTERVAL) => {}, - cmd = command_queue.recv() => { - let cmd = match cmd { - Some(c) =>c, - None => { - // SecondaryController was destroyed, and this has raced with - // our CancellationToken - tracing::info!("Heatmap writer terminating"); - break; - } - }; - - let CommandRequest{ - response_tx, - payload - } = cmd; - let result = writer.handle_command(payload).await; - if response_tx.send(CommandResponse{result}).is_err() { - // Caller went away, e.g. because an HTTP request timed out - tracing::info!("Dropping response to administrative command") - } - } - } - } - - Ok(()) -} - -struct WriteInProgress { - barrier: Barrier, -} - -struct WriteComplete { - tenant_id: TenantId, - completed_at: Instant, -} - -/// The heatmap writer keeps a little bit of per-tenant state, mainly to remember -/// when we last did a write. We only populate this after doing at least one -/// write for a tenant -- this avoids holding state for tenants that have -/// uploads disabled. - -struct WriterTenantState { - // This Weak only exists to enable culling IdleTenant instances - // when the Tenant has been deallocated. - tenant: Weak, - - last_write: Option, -} - -struct HeatmapWriter { - tenant_manager: Arc, - remote_storage: GenericRemoteStorage, - cancel: CancellationToken, - - tenants: HashMap, - - tenants_writing: HashMap, - tasks: JoinSet, - concurrent_writes: usize, -} - -impl HeatmapWriter { - /// Periodic execution phase: check for new work to do, and run it with `spawn_write` - async fn iteration(&mut self) -> anyhow::Result<()> { - self.drain().await; - - // Cull any entries in self.tenants whose Arc is gone - self.tenants.retain(|_k, v| v.tenant.upgrade().is_some()); - - // Cannot spawn more work right now - if self.tenants_writing.len() >= self.concurrent_writes { - return Ok(()); - } - - // Iterate over tenants looking for work to do. - let tenants = self.tenant_manager.get_attached_tenants(); - for tenant in tenants { - // Can't spawn any more work, drop out - if self.tenants_writing.len() >= self.concurrent_writes { - return Ok(()); - } - - // Process is shutting down, drop out - if self.cancel.is_cancelled() { - return Ok(()); - } - - // Skip tenants that don't have heatmaps enabled - if !tenant.get_enable_heatmap() { - continue; - } - - // Skip tenants that aren't in a stable active state - if tenant.current_state() != TenantState::Active { - continue; - } - - // Skip tenants that already have a write in flight - if self.tenants_writing.contains_key(&tenant.get_tenant_id()) { - continue; - } - - // TODO: add a TenantConf for whether to upload at all. This is useful for - // a single-location mode for cheap tenants that don't require HA. - - // TODO: add a mechanism to check whether the active layer set has - // changed since our last write - - self.maybe_spawn_write(tenant); - } - - Ok(()) - } - - async fn drain(&mut self) { - // Drain any complete background operations - loop { - tokio::select!( - biased; - Some(r) = self.tasks.join_next() => { - match r { - Ok(r) => { - self.on_completion(r); - }, - Err(e) => { - // This should not happen, but needn't be fatal. - tracing::error!("Join error on heatmap writer JoinSet! {e}"); - } - } - } - else => { - break; - } - ) - } - } - - fn maybe_spawn_write(&mut self, tenant: Arc) { - // Create an entry in self.tenants if one doesn't already exist: this will later be updated - // with the completion time in on_completion. - let state = self - .tenants - .entry(tenant.get_tenant_id()) - .or_insert_with(|| WriterTenantState { - tenant: Arc::downgrade(&tenant), - last_write: None, - }); - - // Decline to do the upload if insufficient time has passed - if let Some(last_write) = state.last_write { - if Instant::now().duration_since(last_write) < HEATMAP_UPLOAD_INTERVAL { - return; - } - } - - self.spawn_write(tenant) - } - - fn spawn_write(&mut self, tenant: Arc) { - let remote_storage = self.remote_storage.clone(); - let tenant_id = tenant.get_tenant_id(); - let (completion, barrier) = utils::completion::channel(); - self.tasks.spawn(async move { - // Guard for the barrier in [`WriteInProgress`] - let _completion = completion; - - match write_tenant(remote_storage, &tenant) - .instrument(tracing::info_span!( - "write_tenant", - tenant_id = %tenant.get_tenant_id() - )) - .await - { - Ok(()) => {} - Err(e) => { - tracing::warn!( - "Failed to upload heatmap for tenant {}: {e:#}", - tenant.get_tenant_id(), - ) - } - } - - WriteComplete { - tenant_id: tenant.get_tenant_id(), - completed_at: Instant::now(), - } - }); - - self.tenants_writing - .insert(tenant_id, WriteInProgress { barrier }); - } - - fn on_completion(&mut self, completion: WriteComplete) { - tracing::debug!(tenant_id=%completion.tenant_id, "Heatmap write task complete"); - self.tenants_writing.remove(&completion.tenant_id); - tracing::debug!("Task completed for tenant {}", completion.tenant_id); - use std::collections::hash_map::Entry; - match self.tenants.entry(completion.tenant_id) { - Entry::Vacant(_) => { - // Tenant state was dropped, nothing to update. - } - Entry::Occupied(mut entry) => { - entry.get_mut().last_write = Some(completion.completed_at) - } - } - } - - async fn handle_command(&mut self, command: UploadCommand) -> anyhow::Result<()> { - match command { - UploadCommand::Upload(tenant_id) => { - // If an upload was ongoing for this tenant, let it finish first. - if let Some(writing_state) = self.tenants_writing.get(&tenant_id) { - tracing::info!(%tenant_id, "Waiting for heatmap write to complete"); - writing_state.barrier.clone().wait().await; - } - - // Spawn the upload then immediately wait for it. This will block processing of other commands and - // starting of other background work. - tracing::info!(%tenant_id, "Starting heatmap write on command"); - let tenant = mgr::get_tenant(tenant_id, true)?; - self.spawn_write(tenant); - let writing_state = self - .tenants_writing - .get(&tenant_id) - .expect("We just inserted this"); - tracing::info!(%tenant_id, "Waiting for heatmap write to complete"); - writing_state.barrier.clone().wait().await; - tracing::info!(%tenant_id, "Heatmap write complete"); - - // This drain is not necessary for correctness, but it is polite to avoid intentionally leaving - // our complete task in self.tenants_writing. - self.drain().await; - - Ok(()) - } - } - } -} - -async fn write_tenant( - remote_storage: GenericRemoteStorage, - tenant: &Arc, -) -> anyhow::Result<()> { - let mut heatmap = HeatMapTenant { - timelines: Vec::new(), - }; - let timelines = tenant.timelines.lock().unwrap().clone(); - - let tenant_cancel = tenant.cancel.clone(); - - // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise - // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind - // in remote storage. - let _guard = match tenant.gate.enter() { - Ok(g) => g, - Err(_) => { - tracing::info!("Skipping heatmap upload for tenant which is shutting down"); - return Ok(()); - } - }; - - for (timeline_id, timeline) in timelines { - let heatmap_timeline = timeline.generate_heatmap().await; - match heatmap_timeline { - None => { - tracing::debug!( - "Skipping heatmap upload because timeline {timeline_id} is not ready" - ); - return Ok(()); - } - Some(heatmap_timeline) => { - heatmap.timelines.push(heatmap_timeline); - } - } - } - - // Serialize the heatmap - let bytes = serde_json::to_vec(&heatmap)?; - let size = bytes.len(); - - let path = remote_heatmap_path(&tenant.get_tenant_id()); - - // Write the heatmap. - tracing::debug!("Uploading {size} byte heatmap to {path}"); - if let Err(e) = backoff::retry( - || async { - let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone())); - let bytes = Box::new(bytes); - remote_storage - .upload_storage_object(bytes, size, &path) - .await - }, - |_| false, - 3, - u32::MAX, - "Uploading heatmap", - backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")), - ) - .await - { - if tenant_cancel.is_cancelled() { - return Ok(()); - } else { - return Err(e); - } - } - - SECONDARY_MODE.upload_heatmap.inc(); - tracing::info!("Successfully uploaded {size} byte heatmap to {path}"); - - Ok(()) -}