diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 017322ffb2..5dee75303f 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -257,6 +257,12 @@ pub enum TaskKind { /// See [`crate::disk_usage_eviction_task`]. DiskUsageEviction, + /// See [`crate::tenant::secondary`]. + SecondaryDownloads, + + /// See [`crate::tenant::secondary`]. + SecondaryUploads, + // Initial logical size calculation InitialLogicalSizeCalculation, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1e80a023a2..a8b0dbe3a8 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -130,6 +130,7 @@ pub mod storage_layer; pub mod config; pub mod delete; pub mod mgr; +pub mod secondary; pub mod tasks; pub mod upload_queue; diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs new file mode 100644 index 0000000000..c435027e39 --- /dev/null +++ b/pageserver/src/tenant/secondary.rs @@ -0,0 +1,268 @@ +pub mod downloader; +pub mod heatmap; +pub mod heatmap_writer; + +use std::{sync::Arc, time::SystemTime}; + +use crate::{ + config::PageServerConf, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, +}; + +use self::{ + downloader::{downloader_task, SecondaryDetail}, + heatmap_writer::heatmap_writer_task, +}; + +use super::{ + mgr::TenantManager, + storage_layer::{AsLayerDesc, Layer}, + timeline::DiskUsageEvictionInfo, +}; + +use remote_storage::GenericRemoteStorage; + +use tokio_util::sync::CancellationToken; +use utils::{ + completion::Barrier, + fs_ext, + id::{TenantId, TimelineId}, +}; + +enum DownloadCommand { + Download(TenantId), +} +enum UploadCommand { + Upload(TenantId), +} + +struct CommandRequest { + payload: T, + response_tx: tokio::sync::oneshot::Sender, +} + +struct CommandResponse { + result: anyhow::Result<()>, +} + +// Whereas [`Tenant`] represents an attached tenant, this type represents the work +// we do for secondary tenant locations: where we are not serving clients or +// ingesting WAL, but we are maintaining a warm cache of layer files. +// +// This type is all about the _download_ path for secondary mode. The upload path +// runs while a regular attached `Tenant` exists. +// +// This structure coordinates TenantManager and SecondaryDownloader, +// so that the downloader can indicate which tenants it is currently +// operating on, and the manager can indicate when a particular +// secondary tenant should cancel any work in flight. +#[derive(Debug)] +pub(crate) struct SecondaryTenant { + /// Cancellation token indicates to SecondaryDownloader that it should stop doing + /// any work for this tenant at the next opportunity. + pub(crate) cancel: CancellationToken, + + /// Lock must be held by SecondaryDownloader at any time that it might be operating + /// on the local filesystem directory for this tenant ID. + // Ordering: the TenantManager must set the cancellation token _before_ + // taking the lock. The SecondaryDownloader must always check the cancellation + // token immediately _after_ taking the lock (and at appropriate intervals + // while holding it). + pub(crate) busy: Arc>, + + detail: std::sync::Mutex, + // TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads +} + +impl SecondaryTenant { + pub(crate) fn new() -> Arc { + // TODO; consider whether we really need to Arc this + Arc::new(Self { + busy: Arc::new(tokio::sync::Mutex::new(())), + // todo: shall we make this a descendent of the + // main cancellation token, or is it sufficient that + // on shutdown we walk the tenants and fire their + // individual cancellations? + cancel: CancellationToken::new(), + + detail: std::sync::Mutex::default(), + }) + } + + pub(crate) async fn shutdown(&self) { + self.cancel.cancel(); + + // Wait for any secondary downloader work to complete: once we + // acquire this lock, we are guaranteed that the secondary downloader + // won't touch the local filesystem again for this instance: it is safe + // to e.g. construct a `Tenant` for the same TenantId + drop(self.busy.lock().await); + } + + pub(crate) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> { + self.detail.lock().unwrap().get_layers_for_eviction() + } + + pub(crate) async fn evict_layers( + &self, + _guard: tokio::sync::OwnedMutexGuard<()>, + conf: &PageServerConf, + tenant_id: &TenantId, + layers: Vec<(TimelineId, Layer)>, + ) { + crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); + + if self.cancel.is_cancelled() { + // Eviction is a no-op if shutdown() was already called. + tracing::info!( + "Dropping {} layer evictions, secondary tenant shutting down", + layers.len() + ); + return; + } + + let now = SystemTime::now(); + + for (timeline_id, layer) in layers { + let layer_name = layer.layer_desc().filename(); + let path = conf + .timeline_path(tenant_id, &timeline_id) + .join(&layer_name.file_name()); + + // We tolerate ENOENT, because between planning eviction and executing + // it, the secondary downloader could have seen an updated heatmap that + // resulted in a layer being deleted. + tokio::fs::remove_file(path) + .await + .or_else(fs_ext::ignore_not_found) + .expect("TODO: terminate process on local I/O errors"); + + // TODO: batch up updates instead of acquiring lock in inner loop + let mut detail = self.detail.lock().unwrap(); + // If there is no timeline detail for what we just deleted, that indicates that + // the secondary downloader did some work (perhaps removing all) + if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) { + timeline_detail.on_disk_layers.remove(&layer_name); + timeline_detail.evicted_at.insert(layer_name, now); + } + } + } +} + +/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, +/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests, +/// where we want to immediately upload/download for a particular tenant. In normal operation +/// uploads & downloads are autonomous and not driven by this interface. +pub struct SecondaryController { + upload_req_tx: tokio::sync::mpsc::Sender>, + + download_req_tx: tokio::sync::mpsc::Sender>, +} + +impl SecondaryController { + async fn dispatch( + &self, + queue: &tokio::sync::mpsc::Sender>, + payload: T, + ) -> anyhow::Result<()> { + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + + queue + .send(CommandRequest { + payload, + response_tx, + }) + .await + .map_err(|_| anyhow::anyhow!("Receiver shut down"))?; + + let response = response_rx + .await + .map_err(|_| anyhow::anyhow!("Request dropped"))?; + + response.result + } + + pub async fn download_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> { + self.dispatch(&self.download_req_tx, DownloadCommand::Download(tenant_id)) + .await + } + + pub async fn upload_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> { + self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_id)) + .await + } +} + +pub fn spawn_tasks( + conf: &'static PageServerConf, + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) -> SecondaryController { + let mgr_clone = tenant_manager.clone(); + let storage_clone = remote_storage.clone(); + let cancel_clone = cancel.clone(); + let bg_jobs_clone = background_jobs_can_start.clone(); + + let (download_req_tx, download_req_rx) = + tokio::sync::mpsc::channel::>(16); + let (upload_req_tx, upload_req_rx) = + tokio::sync::mpsc::channel::>(16); + + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::SecondaryDownloads, + None, + None, + "secondary tenant downloads", + false, + async move { + downloader_task( + conf, + mgr_clone, + storage_clone, + download_req_rx, + bg_jobs_clone, + cancel_clone, + ) + .await + }, + ); + + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::SecondaryDownloads, + None, + None, + "heatmap uploads", + false, + async move { + heatmap_writer_task( + tenant_manager, + remote_storage, + upload_req_rx, + background_jobs_can_start, + cancel, + ) + .await + }, + ); + + SecondaryController { + download_req_tx, + upload_req_tx, + } +} + +/// For running with remote storage disabled: a SecondaryController that is connected to nothing. +pub fn null_controller() -> SecondaryController { + let (download_req_tx, _download_req_rx) = + tokio::sync::mpsc::channel::>(16); + let (upload_req_tx, _upload_req_rx) = + tokio::sync::mpsc::channel::>(16); + SecondaryController { + upload_req_tx, + download_req_tx, + } +} diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs new file mode 100644 index 0000000000..31e49f8dd4 --- /dev/null +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -0,0 +1,579 @@ +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; + +use crate::{ + config::PageServerConf, + tenant::{ + remote_timeline_client::index::LayerFileMetadata, + secondary::CommandResponse, + storage_layer::{Layer, LayerFileName}, + timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction}, + }, + METADATA_FILE_NAME, +}; + +use super::SecondaryTenant; +use crate::tenant::{ + mgr::TenantManager, + remote_timeline_client::{download::download_layer_file, remote_heatmap_path}, +}; +use anyhow::Context; + +use chrono::format::{DelayedFormat, StrftimeItems}; +use remote_storage::GenericRemoteStorage; + +use tokio_util::sync::CancellationToken; +use tracing::Instrument; +use utils::{ + completion::Barrier, + fs_ext, + id::{TenantId, TimelineId}, +}; + +use super::{ + heatmap::{HeatMapTenant, HeatMapTimeline}, + CommandRequest, DownloadCommand, +}; + +/// Interval between checking if any Secondary tenants have download work to do: +/// note that this is _not_ the frequency with which we actually freshen the tenants, +/// just the frequency with which we wake up to decide whether anyone needs freshening. +/// +/// Making this somewhat infrequent reduces the load on mutexes inside TenantManager +/// and SecondaryTenant for reads when checking for work to do. +const DOWNLOAD_CHECK_INTERVAL: Duration = Duration::from_millis(10000); + +/// For each tenant, how long must have passed since the last freshen_tenant call before +/// calling it again. This is approximately the time by which local data is allowed +/// to fall behind remote data. +/// +/// TODO: this should be an upper bound, and tenants that are uploading regularly +/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second +/// should not wait a minute between freshens) +const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000); + +#[derive(Debug, Clone)] +pub(super) struct OnDiskState { + layer: Layer, + access_time: SystemTime, +} + +impl OnDiskState { + fn new( + conf: &'static PageServerConf, + tenant_id: &TenantId, + timeline_id: &TimelineId, + name: LayerFileName, + metadata: LayerFileMetadata, + access_time: SystemTime, + ) -> Self { + Self { + layer: Layer::for_secondary(conf, tenant_id, timeline_id, name, metadata), + access_time, + } + } +} + +#[derive(Debug, Clone, Default)] +pub(super) struct SecondaryDetailTimeline { + pub(super) on_disk_layers: HashMap, + + /// We remember when layers were evicted, to prevent re-downloading them. + /// TODO: persist this, so that we don't try and re-download everything on restart. + pub(super) evicted_at: HashMap, +} + +/// This state is written by the secondary downloader, it is opaque +/// to TenantManager +#[derive(Default, Debug)] +pub(super) struct SecondaryDetail { + freshened_at: Option, + pub(super) timelines: HashMap, +} + +/// Helper for logging SystemTime +fn strftime(t: &'_ SystemTime) -> DelayedFormat> { + let datetime: chrono::DateTime = (*t).into(); + datetime.format("%d/%m/%Y %T") +} + +impl SecondaryDetail { + pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> { + let mut result = Vec::new(); + for (timeline_id, timeline_detail) in &self.timelines { + let layers: Vec<_> = timeline_detail + .on_disk_layers + .values() + .map(|ods| LocalLayerInfoForDiskUsageEviction { + layer: ods.layer.clone(), + last_activity_ts: ods.access_time, + }) + .collect(); + + let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max(); + + result.push(( + *timeline_id, + DiskUsageEvictionInfo { + resident_layers: layers, + max_layer_size, + }, + )) + } + + result + } +} + +/// Keep trying to do downloads until the cancellation token is fired. Remote storage +/// errors are handled internally: any error returned by this function is an unexpected +/// internal error of some kind. +pub(super) async fn downloader_task( + conf: &'static PageServerConf, + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + mut command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let downloader = SecondaryDownloader { + conf, + tenant_manager, + remote_storage, + cancel: cancel.clone(), + }; + + tracing::info!("Waiting for background_jobs_can start..."); + background_jobs_can_start.wait().await; + tracing::info!("background_jobs_can is ready, proceeding."); + + while !cancel.is_cancelled() { + downloader.iteration().await?; + + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("Heatmap writer terminating"); + break; + }, + _ = tokio::time::sleep(DOWNLOAD_CHECK_INTERVAL) => {}, + cmd = command_queue.recv() => { + let cmd = match cmd { + Some(c) =>c, + None => { + // SecondaryController was destroyed, and this has raced with + // our CancellationToken + tracing::info!("Heatmap writer terminating"); + break; + } + }; + + let CommandRequest{ + response_tx, + payload + } = cmd; + let result = downloader.handle_command(payload).await; + if response_tx.send(CommandResponse{result}).is_err() { + // Caller went away, e.g. because an HTTP request timed out + tracing::info!("Dropping response to administrative command") + } + } + } + } + + Ok(()) +} +struct SecondaryDownloader { + conf: &'static PageServerConf, + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + cancel: CancellationToken, +} + +struct TenantJob { + tenant_id: TenantId, + secondary_state: Arc, + + // This mutex guard conveys the right to write to the tenant's local directory: it must + // be taken before doing downloads, and TenantManager must ensure it has been released + // before it considers shutdown complete for the secondary state -- [`SecondaryDownloader`] + // will thereby never be racing with [`Tenant`] for access to local files. + _guard: tokio::sync::OwnedMutexGuard<()>, +} + +impl SecondaryDownloader { + async fn iteration(&self) -> anyhow::Result<()> { + // Step 1: identify some tenants that we may work on + let mut candidates: Vec = Vec::new(); + self.tenant_manager + .foreach_secondary_tenants(|tenant_id, secondary_state| { + let guard = match secondary_state.busy.clone().try_lock_owned() { + Ok(guard) => guard, + // If we can't lock, someone is in the process of shutting it down, or we are + // already working on it. We may ignore it when scanning for new work to do. + Err(_) => return, + }; + + candidates.push(TenantJob { + tenant_id: *tenant_id, + secondary_state: secondary_state.clone(), + _guard: guard, + }); + }); + + // Step 2: prioritized selection of next batch of tenants to freshen + let now = Instant::now(); + let candidates = candidates.into_iter().filter(|c| { + let detail = c.secondary_state.detail.lock().unwrap(); + match detail.freshened_at { + None => true, // Not yet freshened, therefore elegible to run + Some(t) => { + let since = now.duration_since(t); + since > DOWNLOAD_FRESHEN_INTERVAL + } + } + }); + + // TODO: don't just cut down the list, prioritize it to freshen the stalest tenants first + // TODO: bounded parallelism + + // Step 3: spawn freshen_tenant tasks + for job in candidates { + if job.secondary_state.cancel.is_cancelled() { + continue; + } + + async { + if let Err(e) = self.freshen_tenant(&job).await { + tracing::info!("Failed to freshen secondary content: {e:#}") + }; + + // Update freshened_at even if there was an error: we don't want errored tenants to implicitly + // take priority to run again. + let mut detail = job.secondary_state.detail.lock().unwrap(); + detail.freshened_at = Some(Instant::now()); + } + .instrument(tracing::info_span!( + "freshen_tenant", + tenant_id = %job.tenant_id + )) + .await; + } + + Ok(()) + } + + async fn handle_command(&self, command: DownloadCommand) -> anyhow::Result<()> { + match command { + DownloadCommand::Download(req_tenant_id) => { + let mut candidates: Vec = Vec::new(); + self.tenant_manager + .foreach_secondary_tenants(|tenant_id, secondary_state| { + tracing::info!("foreach_secondary: {tenant_id} ({req_tenant_id})"); + if tenant_id == &req_tenant_id { + let guard = match secondary_state.busy.clone().try_lock_owned() { + Ok(guard) => guard, + // If we can't lock, someone is in the process of shutting it down, or we are + // already working on it. We may ignore it when scanning for new work to do. + Err(_) => return, + }; + + candidates.push(TenantJob { + tenant_id: *tenant_id, + secondary_state: secondary_state.clone(), + _guard: guard, + }); + } + }); + + let tenant_job = if candidates.len() != 1 { + anyhow::bail!("Tenant not found in secondary mode"); + } else { + candidates.pop().unwrap() + }; + + self.freshen_tenant(&tenant_job).await + } + } + } + + async fn download_heatmap(&self, tenant_id: &TenantId) -> anyhow::Result { + // TODO: make download conditional on ETag having changed since last download + + let heatmap_path = remote_heatmap_path(tenant_id); + // TODO: wrap this download in a select! that checks self.cancel + let mut download = self.remote_storage.download(&heatmap_path).await?; + let mut heatmap_bytes = Vec::new(); + let _size = tokio::io::copy(&mut download.download_stream, &mut heatmap_bytes) + .await + .with_context(|| format!("download heatmap {heatmap_path:?}"))?; + + Ok(serde_json::from_slice::(&heatmap_bytes)?) + } + + async fn init_timeline_state( + &self, + tenant_id: &TenantId, + timeline_id: &TimelineId, + heatmap: &HeatMapTimeline, + ) -> anyhow::Result { + let timeline_path = self.conf.timeline_path(tenant_id, timeline_id); + let mut detail = SecondaryDetailTimeline::default(); + + let mut dir = match tokio::fs::read_dir(&timeline_path).await { + Ok(d) => d, + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + tracing::info!("Creating timeline directory {timeline_path}"); + tokio::fs::create_dir(&timeline_path).await?; + + // No entries to report: drop out. + return Ok(detail); + } else { + return Err(e.into()); + } + } + }; + + let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect(); + + while let Some(dentry) = dir.next_entry().await? { + let dentry_file_name = dentry.file_name(); + let file_name = dentry_file_name.to_string_lossy(); + let local_meta = dentry.metadata().await?; + + // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant. + if file_name == METADATA_FILE_NAME { + continue; + } + + match LayerFileName::from_str(&file_name) { + Ok(name) => { + let remote_meta = heatmap_metadata.get(&name); + match remote_meta { + Some(remote_meta) => { + // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784) + if local_meta.len() != remote_meta.metadata.file_size { + // This should not happen, because we do crashsafe write-then-rename when downloading + // layers, and layers in remote storage are immutable. Remove the local file because + // we cannot trust it. + tracing::warn!("Removing local layer {name} with unexpected local size {} != {}", + local_meta.len(), remote_meta.metadata.file_size); + } else { + // We expect the access time to be initialized immediately afterwards, when + // the latest heatmap is applied to the state. + detail.on_disk_layers.insert( + name.clone(), + OnDiskState::new( + self.conf, + tenant_id, + timeline_id, + name, + LayerFileMetadata::from(&remote_meta.metadata), + remote_meta.access_time, + ), + ); + } + } + None => { + // FIXME: consider some optimization when transitioning from attached to secondary: maybe + // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise + // we will end up deleting any layers which were created+uploaded more recently than the heatmap. + tracing::info!( + "Removing secondary local layer {} because it's absent in heatmap", + name + ); + tokio::fs::remove_file(dentry.path()).await?; + } + } + } + Err(_) => { + // Ignore it. + tracing::warn!("Unexpected file in timeline directory: {file_name}"); + } + } + } + + Ok(detail) + } + + async fn freshen_timeline( + &self, + job: &TenantJob, + timeline: HeatMapTimeline, + ) -> anyhow::Result<()> { + let timeline_path = self + .conf + .timeline_path(&job.tenant_id, &timeline.timeline_id); + + // Accumulate updates to the state + let mut touched = Vec::new(); + + // Clone a view of what layers already exist on disk + let timeline_state = job + .secondary_state + .detail + .lock() + .unwrap() + .timelines + .get(&timeline.timeline_id) + .cloned(); + + let timeline_state = match timeline_state { + Some(t) => t, + None => { + // We have no existing state: need to scan local disk for layers first. + self.init_timeline_state(&job.tenant_id, &timeline.timeline_id, &timeline) + .await? + } + }; + + let layers_in_heatmap = timeline + .layers + .iter() + .map(|l| &l.name) + .collect::>(); + let layers_on_disk = timeline_state + .on_disk_layers + .iter() + .map(|l| l.0) + .collect::>(); + + // Remove on-disk layers that are no longer present in heatmap + for layer in layers_on_disk.difference(&layers_in_heatmap) { + let local_path = timeline_path.join(layer.to_string()); + tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",); + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found)?; + } + + // Download heatmap layers that are not present on local disk, or update their + // access time if they are already present. + for layer in timeline.layers { + if self.cancel.is_cancelled() { + return Ok(()); + } + + // Existing on-disk layers: just update their access time. + if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) { + if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata) + || on_disk.access_time != layer.access_time + { + // We already have this layer on disk. Update its access time. + tracing::trace!( + "Access time updated for layer {}: {} -> {}", + layer.name, + strftime(&on_disk.access_time), + strftime(&layer.access_time) + ); + touched.push(layer); + } + continue; + } + + // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more + // recently than it was evicted. + if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) { + if &layer.access_time > evicted_at { + tracing::info!( + "Re-downloading evicted layer {}, accessed at {}, evicted at {}", + layer.name, + strftime(&layer.access_time), + strftime(evicted_at) + ); + } else { + tracing::trace!( + "Not re-downloading evicted layer {}, accessed at {}, evicted at {}", + layer.name, + strftime(&layer.access_time), + strftime(evicted_at) + ); + continue; + } + } + + match download_layer_file( + self.conf, + &self.remote_storage, + job.tenant_id, + timeline.timeline_id, + &layer.name, + &LayerFileMetadata::from(&layer.metadata), + ) + .await + { + Ok(downloaded_bytes) => { + if downloaded_bytes != layer.metadata.file_size { + let local_path = timeline_path.join(layer.name.to_string()); + + tracing::error!( + "Downloaded layer {} with unexpected size {} != {}", + layer.name, + downloaded_bytes, + layer.metadata.file_size + ); + + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found)?; + } + + touched.push(layer) + } + Err(e) => { + // No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect + // that on some future call to freshen the download will work. + // TODO: refine this behavior. + tracing::info!("Failed to download layer {}: {}", layer.name, e); + } + } + } + + // Write updates to state to record layers we just downloaded or touched. + { + let mut detail = job.secondary_state.detail.lock().unwrap(); + let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default(); + + for t in touched { + use std::collections::hash_map::Entry; + match timeline_detail.on_disk_layers.entry(t.name.clone()) { + Entry::Occupied(mut v) => { + v.get_mut().access_time = t.access_time; + } + Entry::Vacant(e) => { + e.insert(OnDiskState::new( + self.conf, + &job.tenant_id, + &timeline.timeline_id, + t.name, + LayerFileMetadata::from(&t.metadata), + t.access_time, + )); + } + } + } + } + + Ok(()) + } + + async fn freshen_tenant(&self, job: &TenantJob) -> anyhow::Result<()> { + // Download the tenant's heatmap + let heatmap = self.download_heatmap(&job.tenant_id).await?; + + // Download the layers in the heatmap + for timeline in heatmap.timelines { + if self.cancel.is_cancelled() { + return Ok(()); + } + + self.freshen_timeline(job, timeline).await?; + } + + Ok(()) + } +} diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs new file mode 100644 index 0000000000..b785695cab --- /dev/null +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -0,0 +1,57 @@ +use std::time::SystemTime; + +use crate::tenant::{ + remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName, +}; + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; + +use utils::id::TimelineId; + +#[derive(Serialize, Deserialize)] +pub(super) struct HeatMapTenant { + pub(super) timelines: Vec, +} + +#[derive(Serialize, Deserialize)] +pub(crate) struct HeatMapLayer { + pub(super) name: LayerFileName, + pub(super) metadata: IndexLayerMetadata, + + pub(super) access_time: SystemTime, + // TODO: an actual 'heat' score that would let secondary locations prioritize downloading + // the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary. +} + +impl HeatMapLayer { + pub(crate) fn new( + name: LayerFileName, + metadata: IndexLayerMetadata, + access_time: SystemTime, + ) -> Self { + Self { + name, + metadata, + access_time, + } + } +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub(crate) struct HeatMapTimeline { + #[serde_as(as = "DisplayFromStr")] + pub(super) timeline_id: TimelineId, + + pub(super) layers: Vec, +} + +impl HeatMapTimeline { + pub(crate) fn new(timeline_id: TimelineId, layers: Vec) -> Self { + Self { + timeline_id, + layers, + } + } +} diff --git a/pageserver/src/tenant/secondary/heatmap_writer.rs b/pageserver/src/tenant/secondary/heatmap_writer.rs new file mode 100644 index 0000000000..de5cab1055 --- /dev/null +++ b/pageserver/src/tenant/secondary/heatmap_writer.rs @@ -0,0 +1,207 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use crate::tenant::{ + mgr::TenantManager, remote_timeline_client::remote_heatmap_path, secondary::CommandResponse, + Tenant, +}; + +use pageserver_api::models::TenantState; +use remote_storage::GenericRemoteStorage; + +use tokio_util::sync::CancellationToken; +use tracing::Instrument; +use utils::{backoff, completion::Barrier}; + +use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand}; + +const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000); + +pub(super) async fn heatmap_writer_task( + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + mut command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let writer = HeatmapWriter { + tenant_manager, + remote_storage, + cancel: cancel.clone(), + }; + + tracing::info!("Waiting for background_jobs_can start..."); + background_jobs_can_start.wait().await; + tracing::info!("background_jobs_can is ready, proceeding."); + + while !cancel.is_cancelled() { + writer.iteration().await?; + + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("Heatmap writer terminating"); + break; + }, + _ = tokio::time::sleep(HEATMAP_UPLOAD_INTERVAL) => {}, + cmd = command_queue.recv() => { + let cmd = match cmd { + Some(c) =>c, + None => { + // SecondaryController was destroyed, and this has raced with + // our CancellationToken + tracing::info!("Heatmap writer terminating"); + break; + } + }; + + let CommandRequest{ + response_tx, + payload + } = cmd; + let result = writer.handle_command(payload).await; + if response_tx.send(CommandResponse{result}).is_err() { + // Caller went away, e.g. because an HTTP request timed out + tracing::info!("Dropping response to administrative command") + } + } + } + } + + Ok(()) +} + +struct HeatmapWriter { + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + cancel: CancellationToken, +} + +impl HeatmapWriter { + async fn iteration(&self) -> anyhow::Result<()> { + let tenants = self.tenant_manager.get_attached_tenants(); + + for tenant in tenants { + if self.cancel.is_cancelled() { + return Ok(()); + } + + if tenant.current_state() != TenantState::Active { + continue; + } + + // TODO: add a mechanism to check whether the active layer set has + // changed since our last write + + // TODO: add a minimum time between uploads + + match self + .write_tenant(&tenant) + .instrument(tracing::info_span!( + "write_tenant", + tenant_id = %tenant.get_tenant_id() + )) + .await + { + Ok(()) => {} + Err(e) => { + tracing::warn!( + "Failed to upload heatmap for tenant {}: {e:#}", + tenant.get_tenant_id(), + ) + } + } + } + + Ok(()) + } + + async fn handle_command(&self, command: UploadCommand) -> anyhow::Result<()> { + match command { + UploadCommand::Upload(tenant_id) => { + let tenants = self.tenant_manager.get_attached_tenants(); + + let map = tenants + .iter() + .map(|t| (t.get_tenant_id(), t)) + .collect::>(); + match map.get(&tenant_id) { + Some(tenant) => self.write_tenant(tenant).await, + None => { + anyhow::bail!("Tenant is not attached"); + } + } + } + } + } + + async fn write_tenant(&self, tenant: &Arc) -> anyhow::Result<()> { + let mut heatmap = HeatMapTenant { + timelines: Vec::new(), + }; + let timelines = tenant.timelines.lock().unwrap().clone(); + + let tenant_cancel = tenant.cancel.clone(); + + // Ensure that Tenant::shutdown waits for any upload in flight + let _guard = { + let hook = tenant.heatmap_hook.lock().unwrap(); + match hook.enter() { + Some(g) => g, + None => { + // Tenant is shutting down + tracing::info!("Skipping, tenant is shutting down"); + return Ok(()); + } + } + }; + + for (timeline_id, timeline) in timelines { + let heatmap_timeline = timeline.generate_heatmap().await; + match heatmap_timeline { + None => { + tracing::debug!( + "Skipping heatmap upload because timeline {timeline_id} is not ready" + ); + return Ok(()); + } + Some(heatmap_timeline) => { + heatmap.timelines.push(heatmap_timeline); + } + } + } + + // Serialize the heatmap + let bytes = serde_json::to_vec(&heatmap)?; + let size = bytes.len(); + + let path = remote_heatmap_path(&tenant.get_tenant_id()); + + // Write the heatmap. + tracing::debug!("Uploading {size} byte heatmap to {path}"); + if let Err(e) = backoff::retry( + || async { + let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone())); + let bytes = Box::new(bytes); + self.remote_storage + .upload_storage_object(bytes, size, &path) + .await + }, + |_| false, + 3, + u32::MAX, + "Uploading heatmap", + backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")), + ) + .await + { + if tenant_cancel.is_cancelled() { + return Ok(()); + } else { + return Err(e); + } + } + + tracing::info!("Successfully uploading {size} byte heatmap to {path}"); + + Ok(()) + } +}