From e080bc053fdf43610392bce2176452d9a922541c Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 12 Oct 2023 09:19:34 +0100 Subject: [PATCH] pageserver: add secondary downloader --- pageserver/src/task_mgr.rs | 3 + pageserver/src/tenant/secondary.rs | 211 ++++- pageserver/src/tenant/secondary/downloader.rs | 720 ++++++++++++++++++ .../src/tenant/secondary/heatmap_writer.rs | 380 +++++++++ 4 files changed, 1306 insertions(+), 8 deletions(-) create mode 100644 pageserver/src/tenant/secondary/downloader.rs create mode 100644 pageserver/src/tenant/secondary/heatmap_writer.rs diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index cb1b2b8011..955adc3a81 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -258,6 +258,9 @@ pub enum TaskKind { /// See [`crate::disk_usage_eviction_task`]. DiskUsageEviction, + /// See [`crate::tenant::secondary`]. + SecondaryDownloads, + /// See [`crate::tenant::secondary`]. SecondaryUploads, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 758dd54ec1..03a6f44eae 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -1,21 +1,37 @@ +mod downloader; pub mod heatmap; mod heatmap_uploader; mod scheduler; -use std::sync::Arc; +use std::{sync::Arc, time::SystemTime}; -use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; +use crate::{ + config::PageServerConf, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + tenant::span::debug_assert_current_span_has_tenant_id, +}; -use self::{heatmap_uploader::heatmap_uploader_task, scheduler::TenantScoped}; +use self::{ + downloader::{downloader_task, SecondaryDetail}, + heatmap_uploader::heatmap_uploader_task, + scheduler::TenantScoped, +}; -use super::mgr::TenantManager; +use super::{ + mgr::TenantManager, + storage_layer::{AsLayerDesc, Layer}, + timeline::DiskUsageEvictionInfo, +}; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use tokio_util::sync::CancellationToken; -use utils::completion::Barrier; +use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate}; +enum DownloadCommand { + Download(TenantShardId), +} enum UploadCommand { Upload(TenantShardId), } @@ -23,7 +39,15 @@ enum UploadCommand { impl TenantScoped for UploadCommand { fn get_tenant_shard_id(&self) -> &TenantShardId { match self { - Self::Upload(id) => &id, + Self::Upload(id) => id, + } + } +} + +impl TenantScoped for DownloadCommand { + fn get_tenant_shard_id(&self) -> &TenantShardId { + match self { + Self::Download(id) => id, } } } @@ -37,12 +61,140 @@ struct CommandResponse { result: anyhow::Result<()>, } +// Whereas [`Tenant`] represents an attached tenant, this type represents the work +// we do for secondary tenant locations: where we are not serving clients or +// ingesting WAL, but we are maintaining a warm cache of layer files. +// +// This type is all about the _download_ path for secondary mode. The upload path +// runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists. +// +// This structure coordinates TenantManager and SecondaryDownloader, +// so that the downloader can indicate which tenants it is currently +// operating on, and the manager can indicate when a particular +// secondary tenant should cancel any work in flight. +#[derive(Debug)] +pub(crate) struct SecondaryTenant { + /// Carrying a tenant shard ID simplifies callers such as the downloader + /// which need to organize many of these objects by ID. + tenant_shard_id: TenantShardId, + + /// Cancellation token indicates to SecondaryDownloader that it should stop doing + /// any work for this tenant at the next opportunity. + pub(crate) cancel: CancellationToken, + + pub(crate) gate: Gate, + + detail: std::sync::Mutex, + // TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads +} + +impl SecondaryTenant { + pub(crate) fn new(tenant_shard_id: TenantShardId) -> Arc { + // TODO; consider whether we really need to Arc this + Arc::new(Self { + tenant_shard_id, + // todo: shall we make this a descendent of the + // main cancellation token, or is it sufficient that + // on shutdown we walk the tenants and fire their + // individual cancellations? + cancel: CancellationToken::new(), + gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")), + + detail: std::sync::Mutex::default(), + }) + } + + pub(crate) async fn shutdown(&self) { + self.cancel.cancel(); + + // Wait for any secondary downloader work to complete + self.gate.close().await; + } + + pub(crate) async fn get_layers_for_eviction( + &self, + conf: &'static PageServerConf, + tenant_shard_id: TenantShardId, + ) -> Vec<(TimelineId, DiskUsageEvictionInfo)> { + debug_assert_current_span_has_tenant_id(); + { + let detail = self.detail.lock().unwrap(); + if !detail.is_uninit() { + return detail.get_layers_for_eviction(); + } else { + // In case we didn't freshen yet in this process lifetime, we will need to scan local storage + // to find all our layers. + } + } + + tracing::debug!("Scanning local layers for secondary tenant to service eviction",); + + // Fall through: we need to initialize Detail + let timelines = SecondaryDetail::init_detail(conf, tenant_shard_id).await; + let mut detail = self.detail.lock().unwrap(); + if detail.is_uninit() { + detail.timelines = timelines; + } + detail.get_layers_for_eviction() + } + + pub(crate) async fn evict_layers( + &self, + conf: &PageServerConf, + tenant_shard_id: &TenantShardId, + layers: Vec<(TimelineId, Layer)>, + ) { + debug_assert_current_span_has_tenant_id(); + let _guard = match self.gate.enter() { + Ok(g) => g, + Err(_) => { + tracing::info!( + "Dropping {} layer evictions, secondary tenant shutting down", + layers.len() + ); + return; + } + }; + + let now = SystemTime::now(); + + for (timeline_id, layer) in layers { + let layer_name = layer.layer_desc().filename(); + let path = conf + .timeline_path(tenant_shard_id, &timeline_id) + .join(&layer_name.file_name()); + + // We tolerate ENOENT, because between planning eviction and executing + // it, the secondary downloader could have seen an updated heatmap that + // resulted in a layer being deleted. + tokio::fs::remove_file(path) + .await + .or_else(fs_ext::ignore_not_found) + .expect("TODO: terminate process on local I/O errors"); + + // TODO: batch up updates instead of acquiring lock in inner loop + let mut detail = self.detail.lock().unwrap(); + // If there is no timeline detail for what we just deleted, that indicates that + // the secondary downloader did some work (perhaps removing all) + if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) { + timeline_detail.on_disk_layers.remove(&layer_name); + timeline_detail.evicted_at.insert(layer_name, now); + } + } + } + + fn get_tenant_shard_id(&self) -> &TenantShardId { + &self.tenant_shard_id + } +} + /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, /// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests, /// where we want to immediately upload/download for a particular tenant. In normal operation /// uploads & downloads are autonomous and not driven by this interface. pub struct SecondaryController { upload_req_tx: tokio::sync::mpsc::Sender>, + download_req_tx: tokio::sync::mpsc::Sender>, } impl SecondaryController { @@ -72,6 +224,13 @@ impl SecondaryController { self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id)) .await } + pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { + self.dispatch( + &self.download_req_tx, + DownloadCommand::Download(tenant_shard_id), + ) + .await + } } pub fn spawn_tasks( @@ -80,9 +239,37 @@ pub fn spawn_tasks( background_jobs_can_start: Barrier, cancel: CancellationToken, ) -> SecondaryController { + let mgr_clone = tenant_manager.clone(); + let storage_clone = remote_storage.clone(); + let cancel_clone = cancel.clone(); + let bg_jobs_clone = background_jobs_can_start.clone(); + + let (download_req_tx, download_req_rx) = + tokio::sync::mpsc::channel::>(16); let (upload_req_tx, upload_req_rx) = tokio::sync::mpsc::channel::>(16); + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::SecondaryDownloads, + None, + None, + "secondary tenant downloads", + false, + async move { + downloader_task( + mgr_clone, + storage_clone, + download_req_rx, + bg_jobs_clone, + cancel_clone, + ) + .await; + + Ok(()) + }, + ); + task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryUploads, @@ -104,12 +291,20 @@ pub fn spawn_tasks( }, ); - SecondaryController { upload_req_tx } + SecondaryController { + download_req_tx, + upload_req_tx, + } } /// For running with remote storage disabled: a SecondaryController that is connected to nothing. pub fn null_controller() -> SecondaryController { + let (download_req_tx, _download_req_rx) = + tokio::sync::mpsc::channel::>(16); let (upload_req_tx, _upload_req_rx) = tokio::sync::mpsc::channel::>(16); - SecondaryController { upload_req_tx } + SecondaryController { + upload_req_tx, + download_req_tx, + } } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs new file mode 100644 index 0000000000..27eb92d658 --- /dev/null +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -0,0 +1,720 @@ +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; + +use crate::{ + config::PageServerConf, + metrics::SECONDARY_MODE, + tenant::{ + debug_assert_current_span_has_tenant_and_timeline_id, + remote_timeline_client::{index::LayerFileMetadata, HEATMAP_BASENAME}, + span::debug_assert_current_span_has_tenant_id, + storage_layer::{Layer, LayerFileName}, + timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction}, + }, + virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}, + METADATA_FILE_NAME, TEMP_FILE_SUFFIX, +}; + +use super::{ + scheduler::{HasBarrier, JobGenerator, SchedulingResult, TenantBackgroundJobs, TenantScoped}, + SecondaryTenant, +}; +use crate::tenant::{ + mgr::TenantManager, + remote_timeline_client::{download::download_layer_file, remote_heatmap_path}, +}; +use anyhow::Context; + +use chrono::format::{DelayedFormat, StrftimeItems}; +use pageserver_api::shard::TenantShardId; +use remote_storage::GenericRemoteStorage; + +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::{instrument, Instrument}; +use utils::{completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId}; + +use super::{ + heatmap::{HeatMapTenant, HeatMapTimeline}, + CommandRequest, DownloadCommand, +}; + +/// For each tenant, how long must have passed since the last download_tenant call before +/// calling it again. This is approximately the time by which local data is allowed +/// to fall behind remote data. +/// +/// TODO: this should be an upper bound, and tenants that are uploading regularly +/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second +/// should not wait a minute between freshens) +const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000); + +const DEFAULT_SCHEDULING_INTERVAL: Duration = Duration::from_millis(60000); +const _MIN_SCHEDULING_INTERVAL: Duration = Duration::from_millis(1000); + +#[derive(Debug, Clone)] +pub(super) struct OnDiskState { + layer: Layer, + access_time: SystemTime, +} + +impl OnDiskState { + fn new( + conf: &'static PageServerConf, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + name: LayerFileName, + metadata: LayerFileMetadata, + access_time: SystemTime, + ) -> Self { + Self { + layer: Layer::for_secondary(conf, tenant_shard_id, timeline_id, name, metadata), + access_time, + } + } +} + +#[derive(Debug, Clone, Default)] +pub(super) struct SecondaryDetailTimeline { + pub(super) on_disk_layers: HashMap, + + /// We remember when layers were evicted, to prevent re-downloading them. + pub(super) evicted_at: HashMap, +} + +/// This state is written by the secondary downloader, it is opaque +/// to TenantManager +#[derive(Default, Debug)] +pub(super) struct SecondaryDetail { + freshened_at: Option, + pub(super) timelines: HashMap, +} + +/// Helper for logging SystemTime +fn strftime(t: &'_ SystemTime) -> DelayedFormat> { + let datetime: chrono::DateTime = (*t).into(); + datetime.format("%d/%m/%Y %T") +} + +impl SecondaryDetail { + pub(super) fn is_uninit(&self) -> bool { + // FIXME: empty timelines is not synonymous with not initialized, as it is legal for + // a tenant to exist with no timelines. + self.timelines.is_empty() + } + + pub(super) async fn init_detail( + conf: &'static PageServerConf, + tenant_shard_id: TenantShardId, + ) -> HashMap { + tracing::info!("init_detail"); + // Load heatmap from local storage + let heatmap_path = conf.tenant_path(&tenant_shard_id).join(HEATMAP_BASENAME); + let heatmap = match tokio::fs::read(&heatmap_path).await { + Ok(bytes) => serde_json::from_slice::(&bytes).unwrap(), + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + return HashMap::new(); + } else { + on_fatal_io_error(&e, &format!("Reading heatmap file from {heatmap_path}")) + } + } + }; + + let mut timelines = HashMap::new(); + + for heatmap_timeline in heatmap.timelines { + let detail = init_timeline_state(conf, &tenant_shard_id, &heatmap_timeline).await; + timelines.insert(heatmap_timeline.timeline_id, detail); + } + + timelines + } + + pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> { + let mut result = Vec::new(); + for (timeline_id, timeline_detail) in &self.timelines { + let layers: Vec<_> = timeline_detail + .on_disk_layers + .values() + .map(|ods| LocalLayerInfoForDiskUsageEviction { + layer: ods.layer.clone(), + last_activity_ts: ods.access_time, + }) + .collect(); + + let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max(); + + result.push(( + *timeline_id, + DiskUsageEvictionInfo { + resident_layers: layers, + max_layer_size, + }, + )) + } + + tracing::debug!( + "Found {} timelines, {} layers", + self.timelines.len(), + result.len() + ); + + result + } +} + +struct SecondaryDownloader { + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, +} + +struct PendingDownload { + secondary_state: Arc, +} + +impl TenantScoped for PendingDownload { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.secondary_state.get_tenant_shard_id() + } +} + +struct RunningDownload { + barrier: Barrier, +} + +impl HasBarrier for RunningDownload { + fn get_barrier(&self) -> Barrier { + self.barrier.clone() + } +} + +struct CompleteDownload { + secondary_state: Arc, + completed_at: Instant, +} + +impl TenantScoped for CompleteDownload { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.secondary_state.get_tenant_shard_id() + } +} + +impl TenantScoped for SecondaryTenant { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.get_tenant_shard_id() + } +} + +type Scheduler = TenantBackgroundJobs< + SecondaryDownloader, + PendingDownload, + RunningDownload, + CompleteDownload, + DownloadCommand, +>; + +#[async_trait::async_trait] +impl JobGenerator + for SecondaryDownloader +{ + #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))] + fn on_completion(&mut self, completion: CompleteDownload) { + let CompleteDownload { + secondary_state, + completed_at: _completed_at, + } = completion; + + tracing::debug!("Secondary tenant download completed"); + + // Update freshened_at even if there was an error: we don't want errored tenants to implicitly + // take priority to run again. + let mut detail = secondary_state.detail.lock().unwrap(); + detail.freshened_at = Some(Instant::now()); + } + + async fn schedule(&mut self) -> SchedulingResult { + let mut result = SchedulingResult { + jobs: Vec::new(), + want_interval: None, + }; + + // Step 1: identify some tenants that we may work on + let mut tenants: Vec> = Vec::new(); + self.tenant_manager + .foreach_secondary_tenants(|_id, secondary_state| { + tenants.push(secondary_state.clone()); + }); + + // Step 2: filter out tenants which are not elegible to run yet + let now = Instant::now(); + let tenants = tenants.into_iter().filter(|c| { + let detail = c.detail.lock().unwrap(); + match detail.freshened_at { + None => true, // Not yet freshened, therefore elegible to run + Some(t) => { + let since = now.duration_since(t); + since > DOWNLOAD_FRESHEN_INTERVAL + } + } + }); + + result.jobs = tenants + .map(|t| PendingDownload { secondary_state: t }) + .collect(); + result + } + + fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result { + let tenant_shard_id = command.get_tenant_shard_id(); + + let tenant = self + .tenant_manager + .get_secondary_tenant_shard(*tenant_shard_id); + let Some(tenant) = tenant else { + { + return Err(anyhow::anyhow!("Not found or not in Secondary mode")); + } + }; + + Ok(PendingDownload { + secondary_state: tenant, + }) + } + + fn spawn( + &mut self, + join_set: &mut JoinSet<()>, + result_tx: tokio::sync::mpsc::UnboundedSender, + job: PendingDownload, + ) -> RunningDownload { + let PendingDownload { secondary_state } = job; + + let (completion, barrier) = utils::completion::channel(); + let remote_storage = self.remote_storage.clone(); + let conf = self.tenant_manager.get_conf(); + join_set.spawn(async move { + let _completion = completion; + + if let Err(e) = TenantDownloader::new(conf, &remote_storage, &secondary_state) + .download() + .await + { + tracing::info!("Failed to freshen secondary content: {e:#}") + }; + + result_tx + .send(CompleteDownload { + secondary_state, + completed_at: Instant::now(), + }) + .ok(); + }); + RunningDownload { barrier } + } +} + +pub(super) async fn downloader_task( + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) { + // TODO: separate config for downloads + let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency; + + let generator = SecondaryDownloader { + tenant_manager, + remote_storage, + }; + let mut scheduler = Scheduler::new(generator, concurrency); + + scheduler + .run(command_queue, background_jobs_can_start, cancel) + .instrument(info_span!("secondary_downloads")) + .await +} + +/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline +async fn init_timeline_state( + conf: &'static PageServerConf, + tenant_shard_id: &TenantShardId, + heatmap: &HeatMapTimeline, +) -> SecondaryDetailTimeline { + let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id); + let mut detail = SecondaryDetailTimeline::default(); + + let mut dir = match tokio::fs::read_dir(&timeline_path).await { + Ok(d) => d, + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + let context = format!("Creating timeline directory {timeline_path}"); + tracing::info!("{}", context); + tokio::fs::create_dir_all(&timeline_path) + .await + .fatal_err(&context); + + // No entries to report: drop out. + return detail; + } else { + on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}")); + } + } + }; + + let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect(); + + while let Some(dentry) = dir + .next_entry() + .await + .fatal_err(&format!("Listing {timeline_path}")) + { + let dentry_file_name = dentry.file_name(); + let file_name = dentry_file_name.to_string_lossy(); + let local_meta = dentry.metadata().await.fatal_err(&format!( + "Read metadata on {}", + dentry.path().to_string_lossy() + )); + + // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant. + if file_name == METADATA_FILE_NAME { + continue; + } + + match LayerFileName::from_str(&file_name) { + Ok(name) => { + let remote_meta = heatmap_metadata.get(&name); + match remote_meta { + Some(remote_meta) => { + // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784) + if local_meta.len() != remote_meta.metadata.file_size { + // This should not happen, because we do crashsafe write-then-rename when downloading + // layers, and layers in remote storage are immutable. Remove the local file because + // we cannot trust it. + tracing::warn!( + "Removing local layer {name} with unexpected local size {} != {}", + local_meta.len(), + remote_meta.metadata.file_size + ); + } else { + // We expect the access time to be initialized immediately afterwards, when + // the latest heatmap is applied to the state. + detail.on_disk_layers.insert( + name.clone(), + OnDiskState::new( + conf, + tenant_shard_id, + &heatmap.timeline_id, + name, + LayerFileMetadata::from(&remote_meta.metadata), + remote_meta.access_time, + ), + ); + } + } + None => { + // FIXME: consider some optimization when transitioning from attached to secondary: maybe + // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise + // we will end up deleting any layers which were created+uploaded more recently than the heatmap. + tracing::info!( + "Removing secondary local layer {} because it's absent in heatmap", + name + ); + tokio::fs::remove_file(&dentry.path()) + .await + .or_else(fs_ext::ignore_not_found) + .fatal_err(&format!( + "Removing layer {}", + dentry.path().to_string_lossy() + )); + } + } + } + Err(_) => { + // Ignore it. + tracing::warn!("Unexpected file in timeline directory: {file_name}"); + } + } + } + + detail +} + +/// This type is a convenience to group together the various functions involved in +/// freshening a secondary tenant. +struct TenantDownloader<'a> { + conf: &'static PageServerConf, + remote_storage: &'a GenericRemoteStorage, + secondary_state: &'a SecondaryTenant, +} + +impl<'a> TenantDownloader<'a> { + fn new( + conf: &'static PageServerConf, + remote_storage: &'a GenericRemoteStorage, + secondary_state: &'a SecondaryTenant, + ) -> Self { + Self { + conf, + remote_storage, + secondary_state, + } + } + + #[instrument(skip_all, name="secondary_download", fields(tenant_id=%self.secondary_state.get_tenant_shard_id().tenant_id, shard_id=%self.secondary_state.get_tenant_shard_id().shard_slug()))] + async fn download(&self) -> anyhow::Result<()> { + // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure + // cover our access to local storage. + let Ok(_guard) = self.secondary_state.gate.enter() else { + // Shutting down + return Ok(()); + }; + + debug_assert_current_span_has_tenant_id(); + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + // Download the tenant's heatmap + let heatmap_bytes = tokio::select!( + bytes = self.download_heatmap() => {bytes?}, + _ = self.secondary_state.cancel.cancelled() => return Ok(()) + ); + + let heatmap = serde_json::from_slice::(&heatmap_bytes)?; + + // Save the heatmap: this will be useful on restart, allowing us to reconstruct + // layer metadata without having to re-download it. + let heatmap_path = self + .conf + .tenant_path(tenant_shard_id) + .join(HEATMAP_BASENAME); + + let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX); + let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}"); + let heatmap_path_bg = heatmap_path.clone(); + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await + }) + }) + .await? + .maybe_fatal_err(&context_msg) + .with_context(|| context_msg)?; + + tracing::debug!("Wrote local heatmap to {}", heatmap_path); + + // Download the layers in the heatmap + for timeline in heatmap.timelines { + if self.secondary_state.cancel.is_cancelled() { + return Ok(()); + } + + let timeline_id = timeline.timeline_id; + self.download_timeline(timeline) + .instrument(tracing::info_span!( + "secondary_download_timeline", + tenant_id=%tenant_shard_id.tenant_id, + shard_id=%tenant_shard_id.shard_slug(), + %timeline_id + )) + .await?; + } + + Ok(()) + } + + async fn download_heatmap(&self) -> anyhow::Result> { + debug_assert_current_span_has_tenant_id(); + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + // TODO: make download conditional on ETag having changed since last download + tracing::debug!("Downloading heatmap for secondary tenant",); + + let heatmap_path = remote_heatmap_path(tenant_shard_id); + let download = self.remote_storage.download(&heatmap_path).await?; + let mut heatmap_bytes = Vec::new(); + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + let _size = tokio::io::copy(&mut body, &mut heatmap_bytes) + .await + .with_context(|| format!("download heatmap {heatmap_path:?}"))?; + + SECONDARY_MODE.download_heatmap.inc(); + + Ok(heatmap_bytes) + } + + async fn download_timeline(&self, timeline: HeatMapTimeline) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_and_timeline_id(); + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + let timeline_path = self + .conf + .timeline_path(tenant_shard_id, &timeline.timeline_id); + + // Accumulate updates to the state + let mut touched = Vec::new(); + + // Clone a view of what layers already exist on disk + let timeline_state = self + .secondary_state + .detail + .lock() + .unwrap() + .timelines + .get(&timeline.timeline_id) + .cloned(); + + let timeline_state = match timeline_state { + Some(t) => t, + None => { + // We have no existing state: need to scan local disk for layers first. + let timeline_state = + init_timeline_state(self.conf, tenant_shard_id, &timeline).await; + + // Re-acquire detail lock now that we're done with async load from local FS + self.secondary_state + .detail + .lock() + .unwrap() + .timelines + .insert(timeline.timeline_id, timeline_state.clone()); + timeline_state + } + }; + + let layers_in_heatmap = timeline + .layers + .iter() + .map(|l| &l.name) + .collect::>(); + let layers_on_disk = timeline_state + .on_disk_layers + .iter() + .map(|l| l.0) + .collect::>(); + + // Remove on-disk layers that are no longer present in heatmap + for layer in layers_on_disk.difference(&layers_in_heatmap) { + let local_path = timeline_path.join(layer.to_string()); + tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",); + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found)?; + } + + // Download heatmap layers that are not present on local disk, or update their + // access time if they are already present. + for layer in timeline.layers { + if self.secondary_state.cancel.is_cancelled() { + return Ok(()); + } + + // Existing on-disk layers: just update their access time. + if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) { + tracing::debug!("Layer {} is already on disk", layer.name); + if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata) + || on_disk.access_time != layer.access_time + { + // We already have this layer on disk. Update its access time. + tracing::debug!( + "Access time updated for layer {}: {} -> {}", + layer.name, + strftime(&on_disk.access_time), + strftime(&layer.access_time) + ); + touched.push(layer); + } + continue; + } else { + tracing::debug!("Layer {} not present on disk yet", layer.name); + } + + // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more + // recently than it was evicted. + if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) { + if &layer.access_time > evicted_at { + tracing::info!( + "Re-downloading evicted layer {}, accessed at {}, evicted at {}", + layer.name, + strftime(&layer.access_time), + strftime(evicted_at) + ); + } else { + tracing::trace!( + "Not re-downloading evicted layer {}, accessed at {}, evicted at {}", + layer.name, + strftime(&layer.access_time), + strftime(evicted_at) + ); + continue; + } + } + + match download_layer_file( + self.conf, + self.remote_storage, + *tenant_shard_id, + timeline.timeline_id, + &layer.name, + &LayerFileMetadata::from(&layer.metadata), + ) + .await + { + Ok(downloaded_bytes) => { + if downloaded_bytes != layer.metadata.file_size { + let local_path = timeline_path.join(layer.name.to_string()); + + tracing::error!( + "Downloaded layer {} with unexpected size {} != {}", + layer.name, + downloaded_bytes, + layer.metadata.file_size + ); + + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found)?; + } + + SECONDARY_MODE.download_layer.inc(); + touched.push(layer) + } + Err(e) => { + // No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect + // that on some future call to freshen the download will work. + // TODO: refine this behavior. + tracing::info!("Failed to download layer {}: {}", layer.name, e); + } + } + } + + // Write updates to state to record layers we just downloaded or touched. + { + let mut detail = self.secondary_state.detail.lock().unwrap(); + let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default(); + + tracing::info!("Wrote timeline_detail for {} touched layers", touched.len()); + + for t in touched { + use std::collections::hash_map::Entry; + match timeline_detail.on_disk_layers.entry(t.name.clone()) { + Entry::Occupied(mut v) => { + v.get_mut().access_time = t.access_time; + } + Entry::Vacant(e) => { + e.insert(OnDiskState::new( + self.conf, + tenant_shard_id, + &timeline.timeline_id, + t.name, + LayerFileMetadata::from(&t.metadata), + t.access_time, + )); + } + } + } + } + + Ok(()) + } +} diff --git a/pageserver/src/tenant/secondary/heatmap_writer.rs b/pageserver/src/tenant/secondary/heatmap_writer.rs new file mode 100644 index 0000000000..1b12844504 --- /dev/null +++ b/pageserver/src/tenant/secondary/heatmap_writer.rs @@ -0,0 +1,380 @@ +use std::{ + collections::HashMap, + sync::{Arc, Weak}, + time::{Duration, Instant}, +}; + +use crate::{ + metrics::SECONDARY_MODE, + tenant::{ + mgr::{self, TenantManager}, + remote_timeline_client::remote_heatmap_path, + secondary::CommandResponse, + Tenant, + }, +}; + +use pageserver_api::models::TenantState; +use remote_storage::GenericRemoteStorage; + +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::Instrument; +use utils::{backoff, completion::Barrier, id::TenantId}; + +use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand}; + +/// Period between heatmap writer walking Tenants to look for work to do +const HEATMAP_WAKE_INTERVAL: Duration = Duration::from_millis(1000); + +/// Periodic between heatmap writes for each Tenant +const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000); + +/// While we take a CancellationToken here, it is subordinate to the CancellationTokens +/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise +/// we might block waiting on a Tenant. +pub(super) async fn heatmap_writer_task( + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + mut command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let mut writer = HeatmapWriter { + tenant_manager, + remote_storage, + cancel: cancel.clone(), + tasks: JoinSet::new(), + tenants: HashMap::new(), + tenants_writing: HashMap::new(), + concurrent_writes: 8, + }; + + tracing::info!("Waiting for background_jobs_can start..."); + background_jobs_can_start.wait().await; + tracing::info!("background_jobs_can is ready, proceeding."); + + while !cancel.is_cancelled() { + writer.iteration().await?; + + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("Heatmap writer joining tasks"); + + tracing::info!("Heatmap writer terminating"); + + break; + }, + _ = tokio::time::sleep(HEATMAP_WAKE_INTERVAL) => {}, + cmd = command_queue.recv() => { + let cmd = match cmd { + Some(c) =>c, + None => { + // SecondaryController was destroyed, and this has raced with + // our CancellationToken + tracing::info!("Heatmap writer terminating"); + break; + } + }; + + let CommandRequest{ + response_tx, + payload + } = cmd; + let result = writer.handle_command(payload).await; + if response_tx.send(CommandResponse{result}).is_err() { + // Caller went away, e.g. because an HTTP request timed out + tracing::info!("Dropping response to administrative command") + } + } + } + } + + Ok(()) +} + +struct WriteInProgress { + barrier: Barrier, +} + +struct WriteComplete { + tenant_id: TenantId, + completed_at: Instant, +} + +/// The heatmap writer keeps a little bit of per-tenant state, mainly to remember +/// when we last did a write. We only populate this after doing at least one +/// write for a tenant -- this avoids holding state for tenants that have +/// uploads disabled. + +struct WriterTenantState { + // This Weak only exists to enable culling IdleTenant instances + // when the Tenant has been deallocated. + tenant: Weak, + + last_write: Option, +} + +struct HeatmapWriter { + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + cancel: CancellationToken, + + tenants: HashMap, + + tenants_writing: HashMap, + tasks: JoinSet, + concurrent_writes: usize, +} + +impl HeatmapWriter { + /// Periodic execution phase: check for new work to do, and run it with `spawn_write` + async fn iteration(&mut self) -> anyhow::Result<()> { + self.drain().await; + + // Cull any entries in self.tenants whose Arc is gone + self.tenants.retain(|_k, v| v.tenant.upgrade().is_some()); + + // Cannot spawn more work right now + if self.tenants_writing.len() >= self.concurrent_writes { + return Ok(()); + } + + // Iterate over tenants looking for work to do. + let tenants = self.tenant_manager.get_attached_tenants(); + for tenant in tenants { + // Can't spawn any more work, drop out + if self.tenants_writing.len() >= self.concurrent_writes { + return Ok(()); + } + + // Process is shutting down, drop out + if self.cancel.is_cancelled() { + return Ok(()); + } + + // Skip tenants that don't have heatmaps enabled + if !tenant.get_enable_heatmap() { + continue; + } + + // Skip tenants that aren't in a stable active state + if tenant.current_state() != TenantState::Active { + continue; + } + + // Skip tenants that already have a write in flight + if self.tenants_writing.contains_key(&tenant.get_tenant_id()) { + continue; + } + + // TODO: add a TenantConf for whether to upload at all. This is useful for + // a single-location mode for cheap tenants that don't require HA. + + // TODO: add a mechanism to check whether the active layer set has + // changed since our last write + + self.maybe_spawn_write(tenant); + } + + Ok(()) + } + + async fn drain(&mut self) { + // Drain any complete background operations + loop { + tokio::select!( + biased; + Some(r) = self.tasks.join_next() => { + match r { + Ok(r) => { + self.on_completion(r); + }, + Err(e) => { + // This should not happen, but needn't be fatal. + tracing::error!("Join error on heatmap writer JoinSet! {e}"); + } + } + } + else => { + break; + } + ) + } + } + + fn maybe_spawn_write(&mut self, tenant: Arc) { + // Create an entry in self.tenants if one doesn't already exist: this will later be updated + // with the completion time in on_completion. + let state = self + .tenants + .entry(tenant.get_tenant_id()) + .or_insert_with(|| WriterTenantState { + tenant: Arc::downgrade(&tenant), + last_write: None, + }); + + // Decline to do the upload if insufficient time has passed + if let Some(last_write) = state.last_write { + if Instant::now().duration_since(last_write) < HEATMAP_UPLOAD_INTERVAL { + return; + } + } + + self.spawn_write(tenant) + } + + fn spawn_write(&mut self, tenant: Arc) { + let remote_storage = self.remote_storage.clone(); + let tenant_id = tenant.get_tenant_id(); + let (completion, barrier) = utils::completion::channel(); + self.tasks.spawn(async move { + // Guard for the barrier in [`WriteInProgress`] + let _completion = completion; + + match write_tenant(remote_storage, &tenant) + .instrument(tracing::info_span!( + "write_tenant", + tenant_id = %tenant.get_tenant_id() + )) + .await + { + Ok(()) => {} + Err(e) => { + tracing::warn!( + "Failed to upload heatmap for tenant {}: {e:#}", + tenant.get_tenant_id(), + ) + } + } + + WriteComplete { + tenant_id: tenant.get_tenant_id(), + completed_at: Instant::now(), + } + }); + + self.tenants_writing + .insert(tenant_id, WriteInProgress { barrier }); + } + + fn on_completion(&mut self, completion: WriteComplete) { + tracing::debug!(tenant_id=%completion.tenant_id, "Heatmap write task complete"); + self.tenants_writing.remove(&completion.tenant_id); + tracing::debug!("Task completed for tenant {}", completion.tenant_id); + use std::collections::hash_map::Entry; + match self.tenants.entry(completion.tenant_id) { + Entry::Vacant(_) => { + // Tenant state was dropped, nothing to update. + } + Entry::Occupied(mut entry) => { + entry.get_mut().last_write = Some(completion.completed_at) + } + } + } + + async fn handle_command(&mut self, command: UploadCommand) -> anyhow::Result<()> { + match command { + UploadCommand::Upload(tenant_id) => { + // If an upload was ongoing for this tenant, let it finish first. + if let Some(writing_state) = self.tenants_writing.get(&tenant_id) { + tracing::info!(%tenant_id, "Waiting for heatmap write to complete"); + writing_state.barrier.clone().wait().await; + } + + // Spawn the upload then immediately wait for it. This will block processing of other commands and + // starting of other background work. + tracing::info!(%tenant_id, "Starting heatmap write on command"); + let tenant = mgr::get_tenant(tenant_id, true)?; + self.spawn_write(tenant); + let writing_state = self + .tenants_writing + .get(&tenant_id) + .expect("We just inserted this"); + tracing::info!(%tenant_id, "Waiting for heatmap write to complete"); + writing_state.barrier.clone().wait().await; + tracing::info!(%tenant_id, "Heatmap write complete"); + + // This drain is not necessary for correctness, but it is polite to avoid intentionally leaving + // our complete task in self.tenants_writing. + self.drain().await; + + Ok(()) + } + } + } +} + +async fn write_tenant( + remote_storage: GenericRemoteStorage, + tenant: &Arc, +) -> anyhow::Result<()> { + let mut heatmap = HeatMapTenant { + timelines: Vec::new(), + }; + let timelines = tenant.timelines.lock().unwrap().clone(); + + let tenant_cancel = tenant.cancel.clone(); + + // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise + // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind + // in remote storage. + let _guard = match tenant.gate.enter() { + Ok(g) => g, + Err(_) => { + tracing::info!("Skipping heatmap upload for tenant which is shutting down"); + return Ok(()); + } + }; + + for (timeline_id, timeline) in timelines { + let heatmap_timeline = timeline.generate_heatmap().await; + match heatmap_timeline { + None => { + tracing::debug!( + "Skipping heatmap upload because timeline {timeline_id} is not ready" + ); + return Ok(()); + } + Some(heatmap_timeline) => { + heatmap.timelines.push(heatmap_timeline); + } + } + } + + // Serialize the heatmap + let bytes = serde_json::to_vec(&heatmap)?; + let size = bytes.len(); + + let path = remote_heatmap_path(&tenant.get_tenant_id()); + + // Write the heatmap. + tracing::debug!("Uploading {size} byte heatmap to {path}"); + if let Err(e) = backoff::retry( + || async { + let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone())); + let bytes = Box::new(bytes); + remote_storage + .upload_storage_object(bytes, size, &path) + .await + }, + |_| false, + 3, + u32::MAX, + "Uploading heatmap", + backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")), + ) + .await + { + if tenant_cancel.is_cancelled() { + return Ok(()); + } else { + return Err(e); + } + } + + SECONDARY_MODE.upload_heatmap.inc(); + tracing::info!("Successfully uploaded {size} byte heatmap to {path}"); + + Ok(()) +}