pageserver: add secondary downloader

This commit is contained in:
John Spray
2023-10-12 09:19:34 +01:00
parent 5e38dd2bad
commit e080bc053f
4 changed files with 1306 additions and 8 deletions

View File

@@ -258,6 +258,9 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
/// See [`crate::tenant::secondary`].
SecondaryDownloads,
/// See [`crate::tenant::secondary`].
SecondaryUploads,

View File

@@ -1,21 +1,37 @@
mod downloader;
pub mod heatmap;
mod heatmap_uploader;
mod scheduler;
use std::sync::Arc;
use std::{sync::Arc, time::SystemTime};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::span::debug_assert_current_span_has_tenant_id,
};
use self::{heatmap_uploader::heatmap_uploader_task, scheduler::TenantScoped};
use self::{
downloader::{downloader_task, SecondaryDetail},
heatmap_uploader::heatmap_uploader_task,
scheduler::TenantScoped,
};
use super::mgr::TenantManager;
use super::{
mgr::TenantManager,
storage_layer::{AsLayerDesc, Layer},
timeline::DiskUsageEvictionInfo,
};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::completion::Barrier;
use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
enum DownloadCommand {
Download(TenantShardId),
}
enum UploadCommand {
Upload(TenantShardId),
}
@@ -23,7 +39,15 @@ enum UploadCommand {
impl TenantScoped for UploadCommand {
fn get_tenant_shard_id(&self) -> &TenantShardId {
match self {
Self::Upload(id) => &id,
Self::Upload(id) => id,
}
}
}
impl TenantScoped for DownloadCommand {
fn get_tenant_shard_id(&self) -> &TenantShardId {
match self {
Self::Download(id) => id,
}
}
}
@@ -37,12 +61,140 @@ struct CommandResponse {
result: anyhow::Result<()>,
}
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
// we do for secondary tenant locations: where we are not serving clients or
// ingesting WAL, but we are maintaining a warm cache of layer files.
//
// This type is all about the _download_ path for secondary mode. The upload path
// runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
//
// This structure coordinates TenantManager and SecondaryDownloader,
// so that the downloader can indicate which tenants it is currently
// operating on, and the manager can indicate when a particular
// secondary tenant should cancel any work in flight.
#[derive(Debug)]
pub(crate) struct SecondaryTenant {
/// Carrying a tenant shard ID simplifies callers such as the downloader
/// which need to organize many of these objects by ID.
tenant_shard_id: TenantShardId,
/// Cancellation token indicates to SecondaryDownloader that it should stop doing
/// any work for this tenant at the next opportunity.
pub(crate) cancel: CancellationToken,
pub(crate) gate: Gate,
detail: std::sync::Mutex<SecondaryDetail>,
// TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads
}
impl SecondaryTenant {
pub(crate) fn new(tenant_shard_id: TenantShardId) -> Arc<Self> {
// TODO; consider whether we really need to Arc this
Arc::new(Self {
tenant_shard_id,
// todo: shall we make this a descendent of the
// main cancellation token, or is it sufficient that
// on shutdown we walk the tenants and fire their
// individual cancellations?
cancel: CancellationToken::new(),
gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")),
detail: std::sync::Mutex::default(),
})
}
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
// Wait for any secondary downloader work to complete
self.gate.close().await;
}
pub(crate) async fn get_layers_for_eviction(
&self,
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
debug_assert_current_span_has_tenant_id();
{
let detail = self.detail.lock().unwrap();
if !detail.is_uninit() {
return detail.get_layers_for_eviction();
} else {
// In case we didn't freshen yet in this process lifetime, we will need to scan local storage
// to find all our layers.
}
}
tracing::debug!("Scanning local layers for secondary tenant to service eviction",);
// Fall through: we need to initialize Detail
let timelines = SecondaryDetail::init_detail(conf, tenant_shard_id).await;
let mut detail = self.detail.lock().unwrap();
if detail.is_uninit() {
detail.timelines = timelines;
}
detail.get_layers_for_eviction()
}
pub(crate) async fn evict_layers(
&self,
conf: &PageServerConf,
tenant_shard_id: &TenantShardId,
layers: Vec<(TimelineId, Layer)>,
) {
debug_assert_current_span_has_tenant_id();
let _guard = match self.gate.enter() {
Ok(g) => g,
Err(_) => {
tracing::info!(
"Dropping {} layer evictions, secondary tenant shutting down",
layers.len()
);
return;
}
};
let now = SystemTime::now();
for (timeline_id, layer) in layers {
let layer_name = layer.layer_desc().filename();
let path = conf
.timeline_path(tenant_shard_id, &timeline_id)
.join(&layer_name.file_name());
// We tolerate ENOENT, because between planning eviction and executing
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
tokio::fs::remove_file(path)
.await
.or_else(fs_ext::ignore_not_found)
.expect("TODO: terminate process on local I/O errors");
// TODO: batch up updates instead of acquiring lock in inner loop
let mut detail = self.detail.lock().unwrap();
// If there is no timeline detail for what we just deleted, that indicates that
// the secondary downloader did some work (perhaps removing all)
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
timeline_detail.on_disk_layers.remove(&layer_name);
timeline_detail.evicted_at.insert(layer_name, now);
}
}
}
fn get_tenant_shard_id(&self) -> &TenantShardId {
&self.tenant_shard_id
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
}
impl SecondaryController {
@@ -72,6 +224,13 @@ impl SecondaryController {
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
.await
}
pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
self.dispatch(
&self.download_req_tx,
DownloadCommand::Download(tenant_shard_id),
)
.await
}
}
pub fn spawn_tasks(
@@ -80,9 +239,37 @@ pub fn spawn_tasks(
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> SecondaryController {
let mgr_clone = tenant_manager.clone();
let storage_clone = remote_storage.clone();
let cancel_clone = cancel.clone();
let bg_jobs_clone = background_jobs_can_start.clone();
let (download_req_tx, download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"secondary tenant downloads",
false,
async move {
downloader_task(
mgr_clone,
storage_clone,
download_req_rx,
bg_jobs_clone,
cancel_clone,
)
.await;
Ok(())
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryUploads,
@@ -104,12 +291,20 @@ pub fn spawn_tasks(
},
);
SecondaryController { upload_req_tx }
SecondaryController {
download_req_tx,
upload_req_tx,
}
}
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
pub fn null_controller() -> SecondaryController {
let (download_req_tx, _download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, _upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
SecondaryController { upload_req_tx }
SecondaryController {
upload_req_tx,
download_req_tx,
}
}

View File

@@ -0,0 +1,720 @@
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use crate::{
config::PageServerConf,
metrics::SECONDARY_MODE,
tenant::{
debug_assert_current_span_has_tenant_and_timeline_id,
remote_timeline_client::{index::LayerFileMetadata, HEATMAP_BASENAME},
span::debug_assert_current_span_has_tenant_id,
storage_layer::{Layer, LayerFileName},
timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction},
},
virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
};
use super::{
scheduler::{HasBarrier, JobGenerator, SchedulingResult, TenantBackgroundJobs, TenantScoped},
SecondaryTenant,
};
use crate::tenant::{
mgr::TenantManager,
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
use anyhow::Context;
use chrono::format::{DelayedFormat, StrftimeItems};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};
use utils::{completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId};
use super::{
heatmap::{HeatMapTenant, HeatMapTimeline},
CommandRequest, DownloadCommand,
};
/// For each tenant, how long must have passed since the last download_tenant call before
/// calling it again. This is approximately the time by which local data is allowed
/// to fall behind remote data.
///
/// TODO: this should be an upper bound, and tenants that are uploading regularly
/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second
/// should not wait a minute between freshens)
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
const DEFAULT_SCHEDULING_INTERVAL: Duration = Duration::from_millis(60000);
const _MIN_SCHEDULING_INTERVAL: Duration = Duration::from_millis(1000);
#[derive(Debug, Clone)]
pub(super) struct OnDiskState {
layer: Layer,
access_time: SystemTime,
}
impl OnDiskState {
fn new(
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
name: LayerFileName,
metadata: LayerFileMetadata,
access_time: SystemTime,
) -> Self {
Self {
layer: Layer::for_secondary(conf, tenant_shard_id, timeline_id, name, metadata),
access_time,
}
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct SecondaryDetailTimeline {
pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
/// We remember when layers were evicted, to prevent re-downloading them.
pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
}
/// This state is written by the secondary downloader, it is opaque
/// to TenantManager
#[derive(Default, Debug)]
pub(super) struct SecondaryDetail {
freshened_at: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
/// Helper for logging SystemTime
fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
datetime.format("%d/%m/%Y %T")
}
impl SecondaryDetail {
pub(super) fn is_uninit(&self) -> bool {
// FIXME: empty timelines is not synonymous with not initialized, as it is legal for
// a tenant to exist with no timelines.
self.timelines.is_empty()
}
pub(super) async fn init_detail(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
) -> HashMap<TimelineId, SecondaryDetailTimeline> {
tracing::info!("init_detail");
// Load heatmap from local storage
let heatmap_path = conf.tenant_path(&tenant_shard_id).join(HEATMAP_BASENAME);
let heatmap = match tokio::fs::read(&heatmap_path).await {
Ok(bytes) => serde_json::from_slice::<HeatMapTenant>(&bytes).unwrap(),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
return HashMap::new();
} else {
on_fatal_io_error(&e, &format!("Reading heatmap file from {heatmap_path}"))
}
}
};
let mut timelines = HashMap::new();
for heatmap_timeline in heatmap.timelines {
let detail = init_timeline_state(conf, &tenant_shard_id, &heatmap_timeline).await;
timelines.insert(heatmap_timeline.timeline_id, detail);
}
timelines
}
pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
let mut result = Vec::new();
for (timeline_id, timeline_detail) in &self.timelines {
let layers: Vec<_> = timeline_detail
.on_disk_layers
.values()
.map(|ods| LocalLayerInfoForDiskUsageEviction {
layer: ods.layer.clone(),
last_activity_ts: ods.access_time,
})
.collect();
let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max();
result.push((
*timeline_id,
DiskUsageEvictionInfo {
resident_layers: layers,
max_layer_size,
},
))
}
tracing::debug!(
"Found {} timelines, {} layers",
self.timelines.len(),
result.len()
);
result
}
}
struct SecondaryDownloader {
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
}
struct PendingDownload {
secondary_state: Arc<SecondaryTenant>,
}
impl TenantScoped for PendingDownload {
fn get_tenant_shard_id(&self) -> &TenantShardId {
self.secondary_state.get_tenant_shard_id()
}
}
struct RunningDownload {
barrier: Barrier,
}
impl HasBarrier for RunningDownload {
fn get_barrier(&self) -> Barrier {
self.barrier.clone()
}
}
struct CompleteDownload {
secondary_state: Arc<SecondaryTenant>,
completed_at: Instant,
}
impl TenantScoped for CompleteDownload {
fn get_tenant_shard_id(&self) -> &TenantShardId {
self.secondary_state.get_tenant_shard_id()
}
}
impl TenantScoped for SecondaryTenant {
fn get_tenant_shard_id(&self) -> &TenantShardId {
self.get_tenant_shard_id()
}
}
type Scheduler = TenantBackgroundJobs<
SecondaryDownloader,
PendingDownload,
RunningDownload,
CompleteDownload,
DownloadCommand,
>;
#[async_trait::async_trait]
impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
for SecondaryDownloader
{
#[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
fn on_completion(&mut self, completion: CompleteDownload) {
let CompleteDownload {
secondary_state,
completed_at: _completed_at,
} = completion;
tracing::debug!("Secondary tenant download completed");
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
// take priority to run again.
let mut detail = secondary_state.detail.lock().unwrap();
detail.freshened_at = Some(Instant::now());
}
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
let mut result = SchedulingResult {
jobs: Vec::new(),
want_interval: None,
};
// Step 1: identify some tenants that we may work on
let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|_id, secondary_state| {
tenants.push(secondary_state.clone());
});
// Step 2: filter out tenants which are not elegible to run yet
let now = Instant::now();
let tenants = tenants.into_iter().filter(|c| {
let detail = c.detail.lock().unwrap();
match detail.freshened_at {
None => true, // Not yet freshened, therefore elegible to run
Some(t) => {
let since = now.duration_since(t);
since > DOWNLOAD_FRESHEN_INTERVAL
}
}
});
result.jobs = tenants
.map(|t| PendingDownload { secondary_state: t })
.collect();
result
}
fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
let tenant_shard_id = command.get_tenant_shard_id();
let tenant = self
.tenant_manager
.get_secondary_tenant_shard(*tenant_shard_id);
let Some(tenant) = tenant else {
{
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
}
};
Ok(PendingDownload {
secondary_state: tenant,
})
}
fn spawn(
&mut self,
join_set: &mut JoinSet<()>,
result_tx: tokio::sync::mpsc::UnboundedSender<CompleteDownload>,
job: PendingDownload,
) -> RunningDownload {
let PendingDownload { secondary_state } = job;
let (completion, barrier) = utils::completion::channel();
let remote_storage = self.remote_storage.clone();
let conf = self.tenant_manager.get_conf();
join_set.spawn(async move {
let _completion = completion;
if let Err(e) = TenantDownloader::new(conf, &remote_storage, &secondary_state)
.download()
.await
{
tracing::info!("Failed to freshen secondary content: {e:#}")
};
result_tx
.send(CompleteDownload {
secondary_state,
completed_at: Instant::now(),
})
.ok();
});
RunningDownload { barrier }
}
}
pub(super) async fn downloader_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) {
// TODO: separate config for downloads
let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
let generator = SecondaryDownloader {
tenant_manager,
remote_storage,
};
let mut scheduler = Scheduler::new(generator, concurrency);
scheduler
.run(command_queue, background_jobs_can_start, cancel)
.instrument(info_span!("secondary_downloads"))
.await
}
/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
async fn init_timeline_state(
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
heatmap: &HeatMapTimeline,
) -> SecondaryDetailTimeline {
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
let mut detail = SecondaryDetailTimeline::default();
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
Ok(d) => d,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
let context = format!("Creating timeline directory {timeline_path}");
tracing::info!("{}", context);
tokio::fs::create_dir_all(&timeline_path)
.await
.fatal_err(&context);
// No entries to report: drop out.
return detail;
} else {
on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
}
}
};
let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect();
while let Some(dentry) = dir
.next_entry()
.await
.fatal_err(&format!("Listing {timeline_path}"))
{
let dentry_file_name = dentry.file_name();
let file_name = dentry_file_name.to_string_lossy();
let local_meta = dentry.metadata().await.fatal_err(&format!(
"Read metadata on {}",
dentry.path().to_string_lossy()
));
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
if file_name == METADATA_FILE_NAME {
continue;
}
match LayerFileName::from_str(&file_name) {
Ok(name) => {
let remote_meta = heatmap_metadata.get(&name);
match remote_meta {
Some(remote_meta) => {
// TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
if local_meta.len() != remote_meta.metadata.file_size {
// This should not happen, because we do crashsafe write-then-rename when downloading
// layers, and layers in remote storage are immutable. Remove the local file because
// we cannot trust it.
tracing::warn!(
"Removing local layer {name} with unexpected local size {} != {}",
local_meta.len(),
remote_meta.metadata.file_size
);
} else {
// We expect the access time to be initialized immediately afterwards, when
// the latest heatmap is applied to the state.
detail.on_disk_layers.insert(
name.clone(),
OnDiskState::new(
conf,
tenant_shard_id,
&heatmap.timeline_id,
name,
LayerFileMetadata::from(&remote_meta.metadata),
remote_meta.access_time,
),
);
}
}
None => {
// FIXME: consider some optimization when transitioning from attached to secondary: maybe
// wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
// we will end up deleting any layers which were created+uploaded more recently than the heatmap.
tracing::info!(
"Removing secondary local layer {} because it's absent in heatmap",
name
);
tokio::fs::remove_file(&dentry.path())
.await
.or_else(fs_ext::ignore_not_found)
.fatal_err(&format!(
"Removing layer {}",
dentry.path().to_string_lossy()
));
}
}
}
Err(_) => {
// Ignore it.
tracing::warn!("Unexpected file in timeline directory: {file_name}");
}
}
}
detail
}
/// This type is a convenience to group together the various functions involved in
/// freshening a secondary tenant.
struct TenantDownloader<'a> {
conf: &'static PageServerConf,
remote_storage: &'a GenericRemoteStorage,
secondary_state: &'a SecondaryTenant,
}
impl<'a> TenantDownloader<'a> {
fn new(
conf: &'static PageServerConf,
remote_storage: &'a GenericRemoteStorage,
secondary_state: &'a SecondaryTenant,
) -> Self {
Self {
conf,
remote_storage,
secondary_state,
}
}
#[instrument(skip_all, name="secondary_download", fields(tenant_id=%self.secondary_state.get_tenant_shard_id().tenant_id, shard_id=%self.secondary_state.get_tenant_shard_id().shard_slug()))]
async fn download(&self) -> anyhow::Result<()> {
// For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
// cover our access to local storage.
let Ok(_guard) = self.secondary_state.gate.enter() else {
// Shutting down
return Ok(());
};
debug_assert_current_span_has_tenant_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
// Download the tenant's heatmap
let heatmap_bytes = tokio::select!(
bytes = self.download_heatmap() => {bytes?},
_ = self.secondary_state.cancel.cancelled() => return Ok(())
);
let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
// Save the heatmap: this will be useful on restart, allowing us to reconstruct
// layer metadata without having to re-download it.
let heatmap_path = self
.conf
.tenant_path(tenant_shard_id)
.join(HEATMAP_BASENAME);
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
let heatmap_path_bg = heatmap_path.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
})
})
.await?
.maybe_fatal_err(&context_msg)
.with_context(|| context_msg)?;
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
// Download the layers in the heatmap
for timeline in heatmap.timelines {
if self.secondary_state.cancel.is_cancelled() {
return Ok(());
}
let timeline_id = timeline.timeline_id;
self.download_timeline(timeline)
.instrument(tracing::info_span!(
"secondary_download_timeline",
tenant_id=%tenant_shard_id.tenant_id,
shard_id=%tenant_shard_id.shard_slug(),
%timeline_id
))
.await?;
}
Ok(())
}
async fn download_heatmap(&self) -> anyhow::Result<Vec<u8>> {
debug_assert_current_span_has_tenant_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
// TODO: make download conditional on ETag having changed since last download
tracing::debug!("Downloading heatmap for secondary tenant",);
let heatmap_path = remote_heatmap_path(tenant_shard_id);
let download = self.remote_storage.download(&heatmap_path).await?;
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy(&mut body, &mut heatmap_bytes)
.await
.with_context(|| format!("download heatmap {heatmap_path:?}"))?;
SECONDARY_MODE.download_heatmap.inc();
Ok(heatmap_bytes)
}
async fn download_timeline(&self, timeline: HeatMapTimeline) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
let timeline_path = self
.conf
.timeline_path(tenant_shard_id, &timeline.timeline_id);
// Accumulate updates to the state
let mut touched = Vec::new();
// Clone a view of what layers already exist on disk
let timeline_state = self
.secondary_state
.detail
.lock()
.unwrap()
.timelines
.get(&timeline.timeline_id)
.cloned();
let timeline_state = match timeline_state {
Some(t) => t,
None => {
// We have no existing state: need to scan local disk for layers first.
let timeline_state =
init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
// Re-acquire detail lock now that we're done with async load from local FS
self.secondary_state
.detail
.lock()
.unwrap()
.timelines
.insert(timeline.timeline_id, timeline_state.clone());
timeline_state
}
};
let layers_in_heatmap = timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
let local_path = timeline_path.join(layer.to_string());
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
return Ok(());
}
// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
tracing::debug!("Layer {} is already on disk", layer.name);
if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata)
|| on_disk.access_time != layer.access_time
{
// We already have this layer on disk. Update its access time.
tracing::debug!(
"Access time updated for layer {}: {} -> {}",
layer.name,
strftime(&on_disk.access_time),
strftime(&layer.access_time)
);
touched.push(layer);
}
continue;
} else {
tracing::debug!("Layer {} not present on disk yet", layer.name);
}
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
// recently than it was evicted.
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
if &layer.access_time > evicted_at {
tracing::info!(
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
} else {
tracing::trace!(
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
continue;
}
}
match download_layer_file(
self.conf,
self.remote_storage,
*tenant_shard_id,
timeline.timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
)
.await
{
Ok(downloaded_bytes) => {
if downloaded_bytes != layer.metadata.file_size {
let local_path = timeline_path.join(layer.name.to_string());
tracing::error!(
"Downloaded layer {} with unexpected size {} != {}",
layer.name,
downloaded_bytes,
layer.metadata.file_size
);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
SECONDARY_MODE.download_layer.inc();
touched.push(layer)
}
Err(e) => {
// No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect
// that on some future call to freshen the download will work.
// TODO: refine this behavior.
tracing::info!("Failed to download layer {}: {}", layer.name, e);
}
}
}
// Write updates to state to record layers we just downloaded or touched.
{
let mut detail = self.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
for t in touched {
use std::collections::hash_map::Entry;
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = t.access_time;
}
Entry::Vacant(e) => {
e.insert(OnDiskState::new(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
t.name,
LayerFileMetadata::from(&t.metadata),
t.access_time,
));
}
}
}
}
Ok(())
}
}

View File

@@ -0,0 +1,380 @@
use std::{
collections::HashMap,
sync::{Arc, Weak},
time::{Duration, Instant},
};
use crate::{
metrics::SECONDARY_MODE,
tenant::{
mgr::{self, TenantManager},
remote_timeline_client::remote_heatmap_path,
secondary::CommandResponse,
Tenant,
},
};
use pageserver_api::models::TenantState;
use remote_storage::GenericRemoteStorage;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{backoff, completion::Barrier, id::TenantId};
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
/// Period between heatmap writer walking Tenants to look for work to do
const HEATMAP_WAKE_INTERVAL: Duration = Duration::from_millis(1000);
/// Periodic between heatmap writes for each Tenant
const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000);
/// While we take a CancellationToken here, it is subordinate to the CancellationTokens
/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise
/// we might block waiting on a Tenant.
pub(super) async fn heatmap_writer_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut writer = HeatmapWriter {
tenant_manager,
remote_storage,
cancel: cancel.clone(),
tasks: JoinSet::new(),
tenants: HashMap::new(),
tenants_writing: HashMap::new(),
concurrent_writes: 8,
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
writer.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer joining tasks");
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(HEATMAP_WAKE_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = writer.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct WriteInProgress {
barrier: Barrier,
}
struct WriteComplete {
tenant_id: TenantId,
completed_at: Instant,
}
/// The heatmap writer keeps a little bit of per-tenant state, mainly to remember
/// when we last did a write. We only populate this after doing at least one
/// write for a tenant -- this avoids holding state for tenants that have
/// uploads disabled.
struct WriterTenantState {
// This Weak only exists to enable culling IdleTenant instances
// when the Tenant has been deallocated.
tenant: Weak<Tenant>,
last_write: Option<Instant>,
}
struct HeatmapWriter {
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
tenants: HashMap<TenantId, WriterTenantState>,
tenants_writing: HashMap<TenantId, WriteInProgress>,
tasks: JoinSet<WriteComplete>,
concurrent_writes: usize,
}
impl HeatmapWriter {
/// Periodic execution phase: check for new work to do, and run it with `spawn_write`
async fn iteration(&mut self) -> anyhow::Result<()> {
self.drain().await;
// Cull any entries in self.tenants whose Arc<Tenant> is gone
self.tenants.retain(|_k, v| v.tenant.upgrade().is_some());
// Cannot spawn more work right now
if self.tenants_writing.len() >= self.concurrent_writes {
return Ok(());
}
// Iterate over tenants looking for work to do.
let tenants = self.tenant_manager.get_attached_tenants();
for tenant in tenants {
// Can't spawn any more work, drop out
if self.tenants_writing.len() >= self.concurrent_writes {
return Ok(());
}
// Process is shutting down, drop out
if self.cancel.is_cancelled() {
return Ok(());
}
// Skip tenants that don't have heatmaps enabled
if !tenant.get_enable_heatmap() {
continue;
}
// Skip tenants that aren't in a stable active state
if tenant.current_state() != TenantState::Active {
continue;
}
// Skip tenants that already have a write in flight
if self.tenants_writing.contains_key(&tenant.get_tenant_id()) {
continue;
}
// TODO: add a TenantConf for whether to upload at all. This is useful for
// a single-location mode for cheap tenants that don't require HA.
// TODO: add a mechanism to check whether the active layer set has
// changed since our last write
self.maybe_spawn_write(tenant);
}
Ok(())
}
async fn drain(&mut self) {
// Drain any complete background operations
loop {
tokio::select!(
biased;
Some(r) = self.tasks.join_next() => {
match r {
Ok(r) => {
self.on_completion(r);
},
Err(e) => {
// This should not happen, but needn't be fatal.
tracing::error!("Join error on heatmap writer JoinSet! {e}");
}
}
}
else => {
break;
}
)
}
}
fn maybe_spawn_write(&mut self, tenant: Arc<Tenant>) {
// Create an entry in self.tenants if one doesn't already exist: this will later be updated
// with the completion time in on_completion.
let state = self
.tenants
.entry(tenant.get_tenant_id())
.or_insert_with(|| WriterTenantState {
tenant: Arc::downgrade(&tenant),
last_write: None,
});
// Decline to do the upload if insufficient time has passed
if let Some(last_write) = state.last_write {
if Instant::now().duration_since(last_write) < HEATMAP_UPLOAD_INTERVAL {
return;
}
}
self.spawn_write(tenant)
}
fn spawn_write(&mut self, tenant: Arc<Tenant>) {
let remote_storage = self.remote_storage.clone();
let tenant_id = tenant.get_tenant_id();
let (completion, barrier) = utils::completion::channel();
self.tasks.spawn(async move {
// Guard for the barrier in [`WriteInProgress`]
let _completion = completion;
match write_tenant(remote_storage, &tenant)
.instrument(tracing::info_span!(
"write_tenant",
tenant_id = %tenant.get_tenant_id()
))
.await
{
Ok(()) => {}
Err(e) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
tenant.get_tenant_id(),
)
}
}
WriteComplete {
tenant_id: tenant.get_tenant_id(),
completed_at: Instant::now(),
}
});
self.tenants_writing
.insert(tenant_id, WriteInProgress { barrier });
}
fn on_completion(&mut self, completion: WriteComplete) {
tracing::debug!(tenant_id=%completion.tenant_id, "Heatmap write task complete");
self.tenants_writing.remove(&completion.tenant_id);
tracing::debug!("Task completed for tenant {}", completion.tenant_id);
use std::collections::hash_map::Entry;
match self.tenants.entry(completion.tenant_id) {
Entry::Vacant(_) => {
// Tenant state was dropped, nothing to update.
}
Entry::Occupied(mut entry) => {
entry.get_mut().last_write = Some(completion.completed_at)
}
}
}
async fn handle_command(&mut self, command: UploadCommand) -> anyhow::Result<()> {
match command {
UploadCommand::Upload(tenant_id) => {
// If an upload was ongoing for this tenant, let it finish first.
if let Some(writing_state) = self.tenants_writing.get(&tenant_id) {
tracing::info!(%tenant_id, "Waiting for heatmap write to complete");
writing_state.barrier.clone().wait().await;
}
// Spawn the upload then immediately wait for it. This will block processing of other commands and
// starting of other background work.
tracing::info!(%tenant_id, "Starting heatmap write on command");
let tenant = mgr::get_tenant(tenant_id, true)?;
self.spawn_write(tenant);
let writing_state = self
.tenants_writing
.get(&tenant_id)
.expect("We just inserted this");
tracing::info!(%tenant_id, "Waiting for heatmap write to complete");
writing_state.barrier.clone().wait().await;
tracing::info!(%tenant_id, "Heatmap write complete");
// This drain is not necessary for correctness, but it is polite to avoid intentionally leaving
// our complete task in self.tenants_writing.
self.drain().await;
Ok(())
}
}
}
}
async fn write_tenant(
remote_storage: GenericRemoteStorage,
tenant: &Arc<Tenant>,
) -> anyhow::Result<()> {
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
};
let timelines = tenant.timelines.lock().unwrap().clone();
let tenant_cancel = tenant.cancel.clone();
// Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
// when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
// in remote storage.
let _guard = match tenant.gate.enter() {
Ok(g) => g,
Err(_) => {
tracing::info!("Skipping heatmap upload for tenant which is shutting down");
return Ok(());
}
};
for (timeline_id, timeline) in timelines {
let heatmap_timeline = timeline.generate_heatmap().await;
match heatmap_timeline {
None => {
tracing::debug!(
"Skipping heatmap upload because timeline {timeline_id} is not ready"
);
return Ok(());
}
Some(heatmap_timeline) => {
heatmap.timelines.push(heatmap_timeline);
}
}
}
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap)?;
let size = bytes.len();
let path = remote_heatmap_path(&tenant.get_tenant_id());
// Write the heatmap.
tracing::debug!("Uploading {size} byte heatmap to {path}");
if let Err(e) = backoff::retry(
|| async {
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone()));
let bytes = Box::new(bytes);
remote_storage
.upload_storage_object(bytes, size, &path)
.await
},
|_| false,
3,
u32::MAX,
"Uploading heatmap",
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
{
if tenant_cancel.is_cancelled() {
return Ok(());
} else {
return Err(e);
}
}
SECONDARY_MODE.upload_heatmap.inc();
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
Ok(())
}