pageserver: add secondary downloader & heatmaps

This commit is contained in:
John Spray
2023-10-12 09:19:34 +01:00
parent 049cb1fb4b
commit 65096ac992
6 changed files with 1118 additions and 0 deletions

View File

@@ -257,6 +257,12 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
/// See [`crate::tenant::secondary`].
SecondaryDownloads,
/// See [`crate::tenant::secondary`].
SecondaryUploads,
// Initial logical size calculation
InitialLogicalSizeCalculation,

View File

@@ -130,6 +130,7 @@ pub mod storage_layer;
pub mod config;
pub mod delete;
pub mod mgr;
pub mod secondary;
pub mod tasks;
pub mod upload_queue;

View File

@@ -0,0 +1,268 @@
pub mod downloader;
pub mod heatmap;
pub mod heatmap_writer;
use std::{sync::Arc, time::SystemTime};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
};
use self::{
downloader::{downloader_task, SecondaryDetail},
heatmap_writer::heatmap_writer_task,
};
use super::{
mgr::TenantManager,
storage_layer::{AsLayerDesc, Layer},
timeline::DiskUsageEvictionInfo,
};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
enum DownloadCommand {
Download(TenantId),
}
enum UploadCommand {
Upload(TenantId),
}
struct CommandRequest<T> {
payload: T,
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
}
struct CommandResponse {
result: anyhow::Result<()>,
}
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
// we do for secondary tenant locations: where we are not serving clients or
// ingesting WAL, but we are maintaining a warm cache of layer files.
//
// This type is all about the _download_ path for secondary mode. The upload path
// runs while a regular attached `Tenant` exists.
//
// This structure coordinates TenantManager and SecondaryDownloader,
// so that the downloader can indicate which tenants it is currently
// operating on, and the manager can indicate when a particular
// secondary tenant should cancel any work in flight.
#[derive(Debug)]
pub(crate) struct SecondaryTenant {
/// Cancellation token indicates to SecondaryDownloader that it should stop doing
/// any work for this tenant at the next opportunity.
pub(crate) cancel: CancellationToken,
/// Lock must be held by SecondaryDownloader at any time that it might be operating
/// on the local filesystem directory for this tenant ID.
// Ordering: the TenantManager must set the cancellation token _before_
// taking the lock. The SecondaryDownloader must always check the cancellation
// token immediately _after_ taking the lock (and at appropriate intervals
// while holding it).
pub(crate) busy: Arc<tokio::sync::Mutex<()>>,
detail: std::sync::Mutex<SecondaryDetail>,
// TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads
}
impl SecondaryTenant {
pub(crate) fn new() -> Arc<Self> {
// TODO; consider whether we really need to Arc this
Arc::new(Self {
busy: Arc::new(tokio::sync::Mutex::new(())),
// todo: shall we make this a descendent of the
// main cancellation token, or is it sufficient that
// on shutdown we walk the tenants and fire their
// individual cancellations?
cancel: CancellationToken::new(),
detail: std::sync::Mutex::default(),
})
}
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
// Wait for any secondary downloader work to complete: once we
// acquire this lock, we are guaranteed that the secondary downloader
// won't touch the local filesystem again for this instance: it is safe
// to e.g. construct a `Tenant` for the same TenantId
drop(self.busy.lock().await);
}
pub(crate) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
self.detail.lock().unwrap().get_layers_for_eviction()
}
pub(crate) async fn evict_layers(
&self,
_guard: tokio::sync::OwnedMutexGuard<()>,
conf: &PageServerConf,
tenant_id: &TenantId,
layers: Vec<(TimelineId, Layer)>,
) {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
if self.cancel.is_cancelled() {
// Eviction is a no-op if shutdown() was already called.
tracing::info!(
"Dropping {} layer evictions, secondary tenant shutting down",
layers.len()
);
return;
}
let now = SystemTime::now();
for (timeline_id, layer) in layers {
let layer_name = layer.layer_desc().filename();
let path = conf
.timeline_path(tenant_id, &timeline_id)
.join(&layer_name.file_name());
// We tolerate ENOENT, because between planning eviction and executing
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
tokio::fs::remove_file(path)
.await
.or_else(fs_ext::ignore_not_found)
.expect("TODO: terminate process on local I/O errors");
// TODO: batch up updates instead of acquiring lock in inner loop
let mut detail = self.detail.lock().unwrap();
// If there is no timeline detail for what we just deleted, that indicates that
// the secondary downloader did some work (perhaps removing all)
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
timeline_detail.on_disk_layers.remove(&layer_name);
timeline_detail.evicted_at.insert(layer_name, now);
}
}
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
}
impl SecondaryController {
async fn dispatch<T>(
&self,
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
payload: T,
) -> anyhow::Result<()> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
queue
.send(CommandRequest {
payload,
response_tx,
})
.await
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
let response = response_rx
.await
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
response.result
}
pub async fn download_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.download_req_tx, DownloadCommand::Download(tenant_id))
.await
}
pub async fn upload_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_id))
.await
}
}
pub fn spawn_tasks(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> SecondaryController {
let mgr_clone = tenant_manager.clone();
let storage_clone = remote_storage.clone();
let cancel_clone = cancel.clone();
let bg_jobs_clone = background_jobs_can_start.clone();
let (download_req_tx, download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"secondary tenant downloads",
false,
async move {
downloader_task(
conf,
mgr_clone,
storage_clone,
download_req_rx,
bg_jobs_clone,
cancel_clone,
)
.await
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"heatmap uploads",
false,
async move {
heatmap_writer_task(
tenant_manager,
remote_storage,
upload_req_rx,
background_jobs_can_start,
cancel,
)
.await
},
);
SecondaryController {
download_req_tx,
upload_req_tx,
}
}
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
pub fn null_controller() -> SecondaryController {
let (download_req_tx, _download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, _upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
SecondaryController {
upload_req_tx,
download_req_tx,
}
}

View File

@@ -0,0 +1,579 @@
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use crate::{
config::PageServerConf,
tenant::{
remote_timeline_client::index::LayerFileMetadata,
secondary::CommandResponse,
storage_layer::{Layer, LayerFileName},
timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction},
},
METADATA_FILE_NAME,
};
use super::SecondaryTenant;
use crate::tenant::{
mgr::TenantManager,
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
use anyhow::Context;
use chrono::format::{DelayedFormat, StrftimeItems};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
use super::{
heatmap::{HeatMapTenant, HeatMapTimeline},
CommandRequest, DownloadCommand,
};
/// Interval between checking if any Secondary tenants have download work to do:
/// note that this is _not_ the frequency with which we actually freshen the tenants,
/// just the frequency with which we wake up to decide whether anyone needs freshening.
///
/// Making this somewhat infrequent reduces the load on mutexes inside TenantManager
/// and SecondaryTenant for reads when checking for work to do.
const DOWNLOAD_CHECK_INTERVAL: Duration = Duration::from_millis(10000);
/// For each tenant, how long must have passed since the last freshen_tenant call before
/// calling it again. This is approximately the time by which local data is allowed
/// to fall behind remote data.
///
/// TODO: this should be an upper bound, and tenants that are uploading regularly
/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second
/// should not wait a minute between freshens)
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
#[derive(Debug, Clone)]
pub(super) struct OnDiskState {
layer: Layer,
access_time: SystemTime,
}
impl OnDiskState {
fn new(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
name: LayerFileName,
metadata: LayerFileMetadata,
access_time: SystemTime,
) -> Self {
Self {
layer: Layer::for_secondary(conf, tenant_id, timeline_id, name, metadata),
access_time,
}
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct SecondaryDetailTimeline {
pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
/// We remember when layers were evicted, to prevent re-downloading them.
/// TODO: persist this, so that we don't try and re-download everything on restart.
pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
}
/// This state is written by the secondary downloader, it is opaque
/// to TenantManager
#[derive(Default, Debug)]
pub(super) struct SecondaryDetail {
freshened_at: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
/// Helper for logging SystemTime
fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
datetime.format("%d/%m/%Y %T")
}
impl SecondaryDetail {
pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
let mut result = Vec::new();
for (timeline_id, timeline_detail) in &self.timelines {
let layers: Vec<_> = timeline_detail
.on_disk_layers
.values()
.map(|ods| LocalLayerInfoForDiskUsageEviction {
layer: ods.layer.clone(),
last_activity_ts: ods.access_time,
})
.collect();
let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max();
result.push((
*timeline_id,
DiskUsageEvictionInfo {
resident_layers: layers,
max_layer_size,
},
))
}
result
}
}
/// Keep trying to do downloads until the cancellation token is fired. Remote storage
/// errors are handled internally: any error returned by this function is an unexpected
/// internal error of some kind.
pub(super) async fn downloader_task(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let downloader = SecondaryDownloader {
conf,
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
downloader.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(DOWNLOAD_CHECK_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = downloader.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct SecondaryDownloader {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
struct TenantJob {
tenant_id: TenantId,
secondary_state: Arc<SecondaryTenant>,
// This mutex guard conveys the right to write to the tenant's local directory: it must
// be taken before doing downloads, and TenantManager must ensure it has been released
// before it considers shutdown complete for the secondary state -- [`SecondaryDownloader`]
// will thereby never be racing with [`Tenant`] for access to local files.
_guard: tokio::sync::OwnedMutexGuard<()>,
}
impl SecondaryDownloader {
async fn iteration(&self) -> anyhow::Result<()> {
// Step 1: identify some tenants that we may work on
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
});
// Step 2: prioritized selection of next batch of tenants to freshen
let now = Instant::now();
let candidates = candidates.into_iter().filter(|c| {
let detail = c.secondary_state.detail.lock().unwrap();
match detail.freshened_at {
None => true, // Not yet freshened, therefore elegible to run
Some(t) => {
let since = now.duration_since(t);
since > DOWNLOAD_FRESHEN_INTERVAL
}
}
});
// TODO: don't just cut down the list, prioritize it to freshen the stalest tenants first
// TODO: bounded parallelism
// Step 3: spawn freshen_tenant tasks
for job in candidates {
if job.secondary_state.cancel.is_cancelled() {
continue;
}
async {
if let Err(e) = self.freshen_tenant(&job).await {
tracing::info!("Failed to freshen secondary content: {e:#}")
};
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
// take priority to run again.
let mut detail = job.secondary_state.detail.lock().unwrap();
detail.freshened_at = Some(Instant::now());
}
.instrument(tracing::info_span!(
"freshen_tenant",
tenant_id = %job.tenant_id
))
.await;
}
Ok(())
}
async fn handle_command(&self, command: DownloadCommand) -> anyhow::Result<()> {
match command {
DownloadCommand::Download(req_tenant_id) => {
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
tracing::info!("foreach_secondary: {tenant_id} ({req_tenant_id})");
if tenant_id == &req_tenant_id {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
}
});
let tenant_job = if candidates.len() != 1 {
anyhow::bail!("Tenant not found in secondary mode");
} else {
candidates.pop().unwrap()
};
self.freshen_tenant(&tenant_job).await
}
}
}
async fn download_heatmap(&self, tenant_id: &TenantId) -> anyhow::Result<HeatMapTenant> {
// TODO: make download conditional on ETag having changed since last download
let heatmap_path = remote_heatmap_path(tenant_id);
// TODO: wrap this download in a select! that checks self.cancel
let mut download = self.remote_storage.download(&heatmap_path).await?;
let mut heatmap_bytes = Vec::new();
let _size = tokio::io::copy(&mut download.download_stream, &mut heatmap_bytes)
.await
.with_context(|| format!("download heatmap {heatmap_path:?}"))?;
Ok(serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?)
}
async fn init_timeline_state(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
heatmap: &HeatMapTimeline,
) -> anyhow::Result<SecondaryDetailTimeline> {
let timeline_path = self.conf.timeline_path(tenant_id, timeline_id);
let mut detail = SecondaryDetailTimeline::default();
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
Ok(d) => d,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
tracing::info!("Creating timeline directory {timeline_path}");
tokio::fs::create_dir(&timeline_path).await?;
// No entries to report: drop out.
return Ok(detail);
} else {
return Err(e.into());
}
}
};
let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect();
while let Some(dentry) = dir.next_entry().await? {
let dentry_file_name = dentry.file_name();
let file_name = dentry_file_name.to_string_lossy();
let local_meta = dentry.metadata().await?;
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
if file_name == METADATA_FILE_NAME {
continue;
}
match LayerFileName::from_str(&file_name) {
Ok(name) => {
let remote_meta = heatmap_metadata.get(&name);
match remote_meta {
Some(remote_meta) => {
// TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
if local_meta.len() != remote_meta.metadata.file_size {
// This should not happen, because we do crashsafe write-then-rename when downloading
// layers, and layers in remote storage are immutable. Remove the local file because
// we cannot trust it.
tracing::warn!("Removing local layer {name} with unexpected local size {} != {}",
local_meta.len(), remote_meta.metadata.file_size);
} else {
// We expect the access time to be initialized immediately afterwards, when
// the latest heatmap is applied to the state.
detail.on_disk_layers.insert(
name.clone(),
OnDiskState::new(
self.conf,
tenant_id,
timeline_id,
name,
LayerFileMetadata::from(&remote_meta.metadata),
remote_meta.access_time,
),
);
}
}
None => {
// FIXME: consider some optimization when transitioning from attached to secondary: maybe
// wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
// we will end up deleting any layers which were created+uploaded more recently than the heatmap.
tracing::info!(
"Removing secondary local layer {} because it's absent in heatmap",
name
);
tokio::fs::remove_file(dentry.path()).await?;
}
}
}
Err(_) => {
// Ignore it.
tracing::warn!("Unexpected file in timeline directory: {file_name}");
}
}
}
Ok(detail)
}
async fn freshen_timeline(
&self,
job: &TenantJob,
timeline: HeatMapTimeline,
) -> anyhow::Result<()> {
let timeline_path = self
.conf
.timeline_path(&job.tenant_id, &timeline.timeline_id);
// Accumulate updates to the state
let mut touched = Vec::new();
// Clone a view of what layers already exist on disk
let timeline_state = job
.secondary_state
.detail
.lock()
.unwrap()
.timelines
.get(&timeline.timeline_id)
.cloned();
let timeline_state = match timeline_state {
Some(t) => t,
None => {
// We have no existing state: need to scan local disk for layers first.
self.init_timeline_state(&job.tenant_id, &timeline.timeline_id, &timeline)
.await?
}
};
let layers_in_heatmap = timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
let local_path = timeline_path.join(layer.to_string());
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.cancel.is_cancelled() {
return Ok(());
}
// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata)
|| on_disk.access_time != layer.access_time
{
// We already have this layer on disk. Update its access time.
tracing::trace!(
"Access time updated for layer {}: {} -> {}",
layer.name,
strftime(&on_disk.access_time),
strftime(&layer.access_time)
);
touched.push(layer);
}
continue;
}
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
// recently than it was evicted.
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
if &layer.access_time > evicted_at {
tracing::info!(
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
} else {
tracing::trace!(
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
continue;
}
}
match download_layer_file(
self.conf,
&self.remote_storage,
job.tenant_id,
timeline.timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
)
.await
{
Ok(downloaded_bytes) => {
if downloaded_bytes != layer.metadata.file_size {
let local_path = timeline_path.join(layer.name.to_string());
tracing::error!(
"Downloaded layer {} with unexpected size {} != {}",
layer.name,
downloaded_bytes,
layer.metadata.file_size
);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
touched.push(layer)
}
Err(e) => {
// No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect
// that on some future call to freshen the download will work.
// TODO: refine this behavior.
tracing::info!("Failed to download layer {}: {}", layer.name, e);
}
}
}
// Write updates to state to record layers we just downloaded or touched.
{
let mut detail = job.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
for t in touched {
use std::collections::hash_map::Entry;
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = t.access_time;
}
Entry::Vacant(e) => {
e.insert(OnDiskState::new(
self.conf,
&job.tenant_id,
&timeline.timeline_id,
t.name,
LayerFileMetadata::from(&t.metadata),
t.access_time,
));
}
}
}
}
Ok(())
}
async fn freshen_tenant(&self, job: &TenantJob) -> anyhow::Result<()> {
// Download the tenant's heatmap
let heatmap = self.download_heatmap(&job.tenant_id).await?;
// Download the layers in the heatmap
for timeline in heatmap.timelines {
if self.cancel.is_cancelled() {
return Ok(());
}
self.freshen_timeline(job, timeline).await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,57 @@
use std::time::SystemTime;
use crate::tenant::{
remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::TimelineId;
#[derive(Serialize, Deserialize)]
pub(super) struct HeatMapTenant {
pub(super) timelines: Vec<HeatMapTimeline>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(super) name: LayerFileName,
pub(super) metadata: IndexLayerMetadata,
pub(super) access_time: SystemTime,
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}
impl HeatMapLayer {
pub(crate) fn new(
name: LayerFileName,
metadata: IndexLayerMetadata,
access_time: SystemTime,
) -> Self {
Self {
name,
metadata,
access_time,
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(super) timeline_id: TimelineId,
pub(super) layers: Vec<HeatMapLayer>,
}
impl HeatMapTimeline {
pub(crate) fn new(timeline_id: TimelineId, layers: Vec<HeatMapLayer>) -> Self {
Self {
timeline_id,
layers,
}
}
}

View File

@@ -0,0 +1,207 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::tenant::{
mgr::TenantManager, remote_timeline_client::remote_heatmap_path, secondary::CommandResponse,
Tenant,
};
use pageserver_api::models::TenantState;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{backoff, completion::Barrier};
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000);
pub(super) async fn heatmap_writer_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let writer = HeatmapWriter {
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
writer.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(HEATMAP_UPLOAD_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = writer.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct HeatmapWriter {
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
impl HeatmapWriter {
async fn iteration(&self) -> anyhow::Result<()> {
let tenants = self.tenant_manager.get_attached_tenants();
for tenant in tenants {
if self.cancel.is_cancelled() {
return Ok(());
}
if tenant.current_state() != TenantState::Active {
continue;
}
// TODO: add a mechanism to check whether the active layer set has
// changed since our last write
// TODO: add a minimum time between uploads
match self
.write_tenant(&tenant)
.instrument(tracing::info_span!(
"write_tenant",
tenant_id = %tenant.get_tenant_id()
))
.await
{
Ok(()) => {}
Err(e) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
tenant.get_tenant_id(),
)
}
}
}
Ok(())
}
async fn handle_command(&self, command: UploadCommand) -> anyhow::Result<()> {
match command {
UploadCommand::Upload(tenant_id) => {
let tenants = self.tenant_manager.get_attached_tenants();
let map = tenants
.iter()
.map(|t| (t.get_tenant_id(), t))
.collect::<HashMap<_, _>>();
match map.get(&tenant_id) {
Some(tenant) => self.write_tenant(tenant).await,
None => {
anyhow::bail!("Tenant is not attached");
}
}
}
}
}
async fn write_tenant(&self, tenant: &Arc<Tenant>) -> anyhow::Result<()> {
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
};
let timelines = tenant.timelines.lock().unwrap().clone();
let tenant_cancel = tenant.cancel.clone();
// Ensure that Tenant::shutdown waits for any upload in flight
let _guard = {
let hook = tenant.heatmap_hook.lock().unwrap();
match hook.enter() {
Some(g) => g,
None => {
// Tenant is shutting down
tracing::info!("Skipping, tenant is shutting down");
return Ok(());
}
}
};
for (timeline_id, timeline) in timelines {
let heatmap_timeline = timeline.generate_heatmap().await;
match heatmap_timeline {
None => {
tracing::debug!(
"Skipping heatmap upload because timeline {timeline_id} is not ready"
);
return Ok(());
}
Some(heatmap_timeline) => {
heatmap.timelines.push(heatmap_timeline);
}
}
}
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap)?;
let size = bytes.len();
let path = remote_heatmap_path(&tenant.get_tenant_id());
// Write the heatmap.
tracing::debug!("Uploading {size} byte heatmap to {path}");
if let Err(e) = backoff::retry(
|| async {
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone()));
let bytes = Box::new(bytes);
self.remote_storage
.upload_storage_object(bytes, size, &path)
.await
},
|_| false,
3,
u32::MAX,
"Uploading heatmap",
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
{
if tenant_cancel.is_cancelled() {
return Ok(());
} else {
return Err(e);
}
}
tracing::info!("Successfully uploading {size} byte heatmap to {path}");
Ok(())
}
}