mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
pageserver: add secondary downloader & heatmaps
This commit is contained in:
@@ -5,6 +5,7 @@ use std::{sync::Arc, time::Duration};
|
||||
/// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
|
||||
/// the resource calls `close()` when they want to ensure that all holders of guards
|
||||
/// have released them, and that no future guards will be issued.
|
||||
#[derive(Debug)]
|
||||
pub struct Gate {
|
||||
/// Each caller of enter() takes one unit from the semaphore. In close(), we
|
||||
/// take all the units to ensure all GateGuards are destroyed.
|
||||
|
||||
@@ -257,6 +257,12 @@ pub enum TaskKind {
|
||||
/// See [`crate::disk_usage_eviction_task`].
|
||||
DiskUsageEviction,
|
||||
|
||||
/// See [`crate::tenant::secondary`].
|
||||
SecondaryDownloads,
|
||||
|
||||
/// See [`crate::tenant::secondary`].
|
||||
SecondaryUploads,
|
||||
|
||||
// Initial logical size calculation
|
||||
InitialLogicalSizeCalculation,
|
||||
|
||||
|
||||
@@ -132,6 +132,7 @@ pub mod storage_layer;
|
||||
pub mod config;
|
||||
pub mod delete;
|
||||
pub mod mgr;
|
||||
pub mod secondary;
|
||||
pub mod tasks;
|
||||
pub mod upload_queue;
|
||||
|
||||
|
||||
283
pageserver/src/tenant/secondary.rs
Normal file
283
pageserver/src/tenant/secondary.rs
Normal file
@@ -0,0 +1,283 @@
|
||||
pub mod downloader;
|
||||
pub mod heatmap;
|
||||
pub mod heatmap_writer;
|
||||
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
};
|
||||
|
||||
use self::{
|
||||
downloader::{downloader_task, SecondaryDetail},
|
||||
heatmap_writer::heatmap_writer_task,
|
||||
};
|
||||
|
||||
use super::{
|
||||
mgr::TenantManager,
|
||||
storage_layer::{AsLayerDesc, Layer},
|
||||
timeline::DiskUsageEvictionInfo,
|
||||
};
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
completion::Barrier,
|
||||
fs_ext,
|
||||
id::{TenantId, TimelineId},
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
enum DownloadCommand {
|
||||
Download(TenantId),
|
||||
}
|
||||
enum UploadCommand {
|
||||
Upload(TenantId),
|
||||
}
|
||||
|
||||
struct CommandRequest<T> {
|
||||
payload: T,
|
||||
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
|
||||
}
|
||||
|
||||
struct CommandResponse {
|
||||
result: anyhow::Result<()>,
|
||||
}
|
||||
|
||||
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
|
||||
// we do for secondary tenant locations: where we are not serving clients or
|
||||
// ingesting WAL, but we are maintaining a warm cache of layer files.
|
||||
//
|
||||
// This type is all about the _download_ path for secondary mode. The upload path
|
||||
// runs while a regular attached `Tenant` exists.
|
||||
//
|
||||
// This structure coordinates TenantManager and SecondaryDownloader,
|
||||
// so that the downloader can indicate which tenants it is currently
|
||||
// operating on, and the manager can indicate when a particular
|
||||
// secondary tenant should cancel any work in flight.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SecondaryTenant {
|
||||
/// Cancellation token indicates to SecondaryDownloader that it should stop doing
|
||||
/// any work for this tenant at the next opportunity.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
|
||||
pub(crate) gate: Gate,
|
||||
|
||||
detail: std::sync::Mutex<SecondaryDetail>,
|
||||
// TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads
|
||||
}
|
||||
|
||||
impl SecondaryTenant {
|
||||
pub(crate) fn new(tenant_id: TenantId) -> Arc<Self> {
|
||||
// TODO; consider whether we really need to Arc this
|
||||
Arc::new(Self {
|
||||
// todo: shall we make this a descendent of the
|
||||
// main cancellation token, or is it sufficient that
|
||||
// on shutdown we walk the tenants and fire their
|
||||
// individual cancellations?
|
||||
cancel: CancellationToken::new(),
|
||||
gate: Gate::new(format!("SecondaryTenant {tenant_id}")),
|
||||
|
||||
detail: std::sync::Mutex::default(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancel.cancel();
|
||||
|
||||
// Wait for any secondary downloader work to complete
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
pub(crate) async fn get_layers_for_eviction(
|
||||
&self,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
|
||||
super::debug_assert_current_span_has_tenant_id();
|
||||
{
|
||||
let detail = self.detail.lock().unwrap();
|
||||
if !detail.is_uninit() {
|
||||
return detail.get_layers_for_eviction();
|
||||
} else {
|
||||
// In case we didn't freshen yet in this process lifetime, we will need to scan local storage
|
||||
// to find all our layers.
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Scanning local layers for secondary tenant to service eviction",);
|
||||
|
||||
// Fall through: we need to initialize Detail
|
||||
let timelines = SecondaryDetail::init_detail(conf, tenant_id).await;
|
||||
let mut detail = self.detail.lock().unwrap();
|
||||
if detail.is_uninit() {
|
||||
detail.timelines = timelines;
|
||||
}
|
||||
detail.get_layers_for_eviction()
|
||||
}
|
||||
|
||||
pub(crate) async fn evict_layers(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
layers: Vec<(TimelineId, Layer)>,
|
||||
) {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_id();
|
||||
let _guard = match self.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
"Dropping {} layer evictions, secondary tenant shutting down",
|
||||
layers.len()
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
|
||||
for (timeline_id, layer) in layers {
|
||||
let layer_name = layer.layer_desc().filename();
|
||||
let path = conf
|
||||
.timeline_path(tenant_id, &timeline_id)
|
||||
.join(&layer_name.file_name());
|
||||
|
||||
// We tolerate ENOENT, because between planning eviction and executing
|
||||
// it, the secondary downloader could have seen an updated heatmap that
|
||||
// resulted in a layer being deleted.
|
||||
tokio::fs::remove_file(path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.expect("TODO: terminate process on local I/O errors");
|
||||
|
||||
// TODO: batch up updates instead of acquiring lock in inner loop
|
||||
let mut detail = self.detail.lock().unwrap();
|
||||
// If there is no timeline detail for what we just deleted, that indicates that
|
||||
// the secondary downloader did some work (perhaps removing all)
|
||||
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
|
||||
timeline_detail.on_disk_layers.remove(&layer_name);
|
||||
timeline_detail.evicted_at.insert(layer_name, now);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
|
||||
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
|
||||
/// where we want to immediately upload/download for a particular tenant. In normal operation
|
||||
/// uploads & downloads are autonomous and not driven by this interface.
|
||||
pub struct SecondaryController {
|
||||
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
|
||||
|
||||
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
|
||||
}
|
||||
|
||||
impl SecondaryController {
|
||||
async fn dispatch<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
|
||||
payload: T,
|
||||
) -> anyhow::Result<()> {
|
||||
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
queue
|
||||
.send(CommandRequest {
|
||||
payload,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
|
||||
|
||||
let response = response_rx
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
|
||||
|
||||
response.result
|
||||
}
|
||||
|
||||
pub async fn download_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
|
||||
self.dispatch(&self.download_req_tx, DownloadCommand::Download(tenant_id))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn upload_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
|
||||
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_id))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_tasks(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) -> SecondaryController {
|
||||
let mgr_clone = tenant_manager.clone();
|
||||
let storage_clone = remote_storage.clone();
|
||||
let cancel_clone = cancel.clone();
|
||||
let bg_jobs_clone = background_jobs_can_start.clone();
|
||||
|
||||
let (download_req_tx, download_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
|
||||
let (upload_req_tx, upload_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::SecondaryDownloads,
|
||||
None,
|
||||
None,
|
||||
"secondary tenant downloads",
|
||||
false,
|
||||
async move {
|
||||
downloader_task(
|
||||
conf,
|
||||
mgr_clone,
|
||||
storage_clone,
|
||||
download_req_rx,
|
||||
bg_jobs_clone,
|
||||
cancel_clone,
|
||||
)
|
||||
.await
|
||||
},
|
||||
);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::SecondaryDownloads,
|
||||
None,
|
||||
None,
|
||||
"heatmap uploads",
|
||||
false,
|
||||
async move {
|
||||
heatmap_writer_task(
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
upload_req_rx,
|
||||
background_jobs_can_start,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
},
|
||||
);
|
||||
|
||||
SecondaryController {
|
||||
download_req_tx,
|
||||
upload_req_tx,
|
||||
}
|
||||
}
|
||||
|
||||
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
|
||||
pub fn null_controller() -> SecondaryController {
|
||||
let (download_req_tx, _download_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
|
||||
let (upload_req_tx, _upload_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
|
||||
SecondaryController {
|
||||
upload_req_tx,
|
||||
download_req_tx,
|
||||
}
|
||||
}
|
||||
671
pageserver/src/tenant/secondary/downloader.rs
Normal file
671
pageserver/src/tenant/secondary/downloader.rs
Normal file
@@ -0,0 +1,671 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
remote_timeline_client::{index::LayerFileMetadata, HEATMAP_BASENAME},
|
||||
secondary::CommandResponse,
|
||||
storage_layer::{Layer, LayerFileName},
|
||||
timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction},
|
||||
},
|
||||
METADATA_FILE_NAME,
|
||||
};
|
||||
|
||||
use super::SecondaryTenant;
|
||||
use crate::tenant::{
|
||||
mgr::TenantManager,
|
||||
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
|
||||
};
|
||||
use anyhow::Context;
|
||||
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::{
|
||||
completion::Barrier,
|
||||
fs_ext,
|
||||
id::{TenantId, TimelineId},
|
||||
sync::gate::GateGuard,
|
||||
};
|
||||
|
||||
use super::{
|
||||
heatmap::{HeatMapTenant, HeatMapTimeline},
|
||||
CommandRequest, DownloadCommand,
|
||||
};
|
||||
|
||||
/// Interval between checking if any Secondary tenants have download work to do:
|
||||
/// note that this is _not_ the frequency with which we actually freshen the tenants,
|
||||
/// just the frequency with which we wake up to decide whether anyone needs freshening.
|
||||
///
|
||||
/// Making this somewhat infrequent reduces the load on mutexes inside TenantManager
|
||||
/// and SecondaryTenant for reads when checking for work to do.
|
||||
const DOWNLOAD_CHECK_INTERVAL: Duration = Duration::from_millis(10000);
|
||||
|
||||
/// For each tenant, how long must have passed since the last download_tenant call before
|
||||
/// calling it again. This is approximately the time by which local data is allowed
|
||||
/// to fall behind remote data.
|
||||
///
|
||||
/// TODO: this should be an upper bound, and tenants that are uploading regularly
|
||||
/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second
|
||||
/// should not wait a minute between freshens)
|
||||
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct OnDiskState {
|
||||
layer: Layer,
|
||||
access_time: SystemTime,
|
||||
}
|
||||
|
||||
impl OnDiskState {
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
name: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
access_time: SystemTime,
|
||||
) -> Self {
|
||||
Self {
|
||||
layer: Layer::for_secondary(conf, tenant_id, timeline_id, name, metadata),
|
||||
access_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
|
||||
|
||||
/// We remember when layers were evicted, to prevent re-downloading them.
|
||||
/// TODO: persist this, so that we don't try and re-download everything on restart.
|
||||
pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
|
||||
}
|
||||
|
||||
/// This state is written by the secondary downloader, it is opaque
|
||||
/// to TenantManager
|
||||
#[derive(Default, Debug)]
|
||||
pub(super) struct SecondaryDetail {
|
||||
freshened_at: Option<Instant>,
|
||||
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
|
||||
}
|
||||
|
||||
/// Helper for logging SystemTime
|
||||
fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
|
||||
let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
|
||||
datetime.format("%d/%m/%Y %T")
|
||||
}
|
||||
|
||||
impl SecondaryDetail {
|
||||
pub(super) fn is_uninit(&self) -> bool {
|
||||
// FIXME: empty timelines is not synonymous with not initialized, as it is legal for
|
||||
// a tenant to exist with no timelines.
|
||||
self.timelines.is_empty()
|
||||
}
|
||||
|
||||
pub(super) async fn init_detail(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> HashMap<TimelineId, SecondaryDetailTimeline> {
|
||||
tracing::info!("init_detail");
|
||||
// Load heatmap from local storage
|
||||
let heatmap_path = conf.tenant_path(&tenant_id).join(HEATMAP_BASENAME);
|
||||
let heatmap = match tokio::fs::read(heatmap_path).await {
|
||||
Ok(bytes) => serde_json::from_slice::<HeatMapTenant>(&bytes).unwrap(),
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
return HashMap::new();
|
||||
} else {
|
||||
// TODO: use local IO fatal helpers
|
||||
panic!("Unexpected error");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let mut timelines = HashMap::new();
|
||||
|
||||
for heatmap_timeline in heatmap.timelines {
|
||||
// TODO: use fatal IO helpers
|
||||
let detail = init_timeline_state(conf, &tenant_id, &heatmap_timeline)
|
||||
.await
|
||||
.expect("Failed reading local disk");
|
||||
timelines.insert(heatmap_timeline.timeline_id, detail);
|
||||
}
|
||||
|
||||
timelines
|
||||
}
|
||||
|
||||
pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
|
||||
let mut result = Vec::new();
|
||||
for (timeline_id, timeline_detail) in &self.timelines {
|
||||
let layers: Vec<_> = timeline_detail
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|ods| LocalLayerInfoForDiskUsageEviction {
|
||||
layer: ods.layer.clone(),
|
||||
last_activity_ts: ods.access_time,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max();
|
||||
|
||||
result.push((
|
||||
*timeline_id,
|
||||
DiskUsageEvictionInfo {
|
||||
resident_layers: layers,
|
||||
max_layer_size,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Found {} timelines, {} layers",
|
||||
self.timelines.len(),
|
||||
result.len()
|
||||
);
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/// Keep trying to do downloads until the cancellation token is fired. Remote storage
|
||||
/// errors are handled internally: any error returned by this function is an unexpected
|
||||
/// internal error of some kind.
|
||||
pub(super) async fn downloader_task(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let downloader = SecondaryDownloader {
|
||||
conf,
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
cancel: cancel.clone(),
|
||||
};
|
||||
|
||||
tracing::info!("Waiting for background_jobs_can start...");
|
||||
background_jobs_can_start.wait().await;
|
||||
tracing::info!("background_jobs_can is ready, proceeding.");
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
downloader.iteration().await?;
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Heatmap writer terminating");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(DOWNLOAD_CHECK_INTERVAL) => {},
|
||||
cmd = command_queue.recv() => {
|
||||
let cmd = match cmd {
|
||||
Some(c) =>c,
|
||||
None => {
|
||||
// SecondaryController was destroyed, and this has raced with
|
||||
// our CancellationToken
|
||||
tracing::info!("Heatmap writer terminating");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let CommandRequest{
|
||||
response_tx,
|
||||
payload
|
||||
} = cmd;
|
||||
let result = downloader.handle_command(payload).await;
|
||||
if response_tx.send(CommandResponse{result}).is_err() {
|
||||
// Caller went away, e.g. because an HTTP request timed out
|
||||
tracing::info!("Dropping response to administrative command")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
struct SecondaryDownloader {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
struct TenantJob {
|
||||
tenant_id: TenantId,
|
||||
secondary_state: Arc<SecondaryTenant>,
|
||||
|
||||
// This mutex guard conveys the right to write to the tenant's local directory: it must
|
||||
// be taken before doing downloads, and TenantManager must ensure it has been released
|
||||
// before it considers shutdown complete for the secondary state -- [`SecondaryDownloader`]
|
||||
// will thereby never be racing with [`Tenant`] for access to local files.
|
||||
_guard: GateGuard,
|
||||
}
|
||||
|
||||
impl SecondaryDownloader {
|
||||
async fn iteration(&self) -> anyhow::Result<()> {
|
||||
// Step 1: identify some tenants that we may work on
|
||||
let mut candidates: Vec<TenantJob> = Vec::new();
|
||||
self.tenant_manager
|
||||
.foreach_secondary_tenants(|tenant_id, secondary_state| {
|
||||
let guard = match secondary_state.gate.enter() {
|
||||
Ok(guard) => guard,
|
||||
// Tenant is shutting down, do no downloads for it
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
candidates.push(TenantJob {
|
||||
tenant_id: *tenant_id,
|
||||
secondary_state: secondary_state.clone(),
|
||||
_guard: guard,
|
||||
});
|
||||
});
|
||||
|
||||
// Step 2: prioritized selection of next batch of tenants to freshen
|
||||
let now = Instant::now();
|
||||
let candidates = candidates.into_iter().filter(|c| {
|
||||
let detail = c.secondary_state.detail.lock().unwrap();
|
||||
match detail.freshened_at {
|
||||
None => true, // Not yet freshened, therefore elegible to run
|
||||
Some(t) => {
|
||||
let since = now.duration_since(t);
|
||||
since > DOWNLOAD_FRESHEN_INTERVAL
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// TODO: don't just cut down the list, prioritize it to freshen the stalest tenants first
|
||||
// TODO: bounded parallelism
|
||||
|
||||
// Step 3: spawn download_tenant tasks
|
||||
for job in candidates {
|
||||
if job.secondary_state.cancel.is_cancelled() {
|
||||
continue;
|
||||
}
|
||||
|
||||
async {
|
||||
if let Err(e) = self
|
||||
.download_tenant(&job)
|
||||
.instrument(tracing::info_span!("download_tenant", tenant_id=%job.tenant_id))
|
||||
.await
|
||||
{
|
||||
tracing::info!("Failed to freshen secondary content: {e:#}")
|
||||
};
|
||||
|
||||
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
|
||||
// take priority to run again.
|
||||
let mut detail = job.secondary_state.detail.lock().unwrap();
|
||||
detail.freshened_at = Some(Instant::now());
|
||||
}
|
||||
.instrument(tracing::info_span!(
|
||||
"download_tenant",
|
||||
tenant_id = %job.tenant_id
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_command(&self, command: DownloadCommand) -> anyhow::Result<()> {
|
||||
match command {
|
||||
DownloadCommand::Download(req_tenant_id) => {
|
||||
let mut candidates: Vec<TenantJob> = Vec::new();
|
||||
self.tenant_manager
|
||||
.foreach_secondary_tenants(|tenant_id, secondary_state| {
|
||||
tracing::info!("foreach_secondary: {tenant_id} ({req_tenant_id})");
|
||||
if tenant_id == &req_tenant_id {
|
||||
let guard = match secondary_state.gate.enter() {
|
||||
Ok(guard) => guard,
|
||||
// Shutting down
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
candidates.push(TenantJob {
|
||||
tenant_id: *tenant_id,
|
||||
secondary_state: secondary_state.clone(),
|
||||
_guard: guard,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
let tenant_job = if candidates.len() != 1 {
|
||||
anyhow::bail!("Tenant not found in secondary mode");
|
||||
} else {
|
||||
candidates.pop().unwrap()
|
||||
};
|
||||
|
||||
self.download_tenant(&tenant_job).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_heatmap(&self, tenant_id: &TenantId) -> anyhow::Result<HeatMapTenant> {
|
||||
// TODO: make download conditional on ETag having changed since last download
|
||||
|
||||
let heatmap_path = remote_heatmap_path(tenant_id);
|
||||
// TODO: wrap this download in a select! that checks self.cancel
|
||||
let mut download = self.remote_storage.download(&heatmap_path).await?;
|
||||
let mut heatmap_bytes = Vec::new();
|
||||
let _size = tokio::io::copy(&mut download.download_stream, &mut heatmap_bytes)
|
||||
.await
|
||||
.with_context(|| format!("download heatmap {heatmap_path:?}"))?;
|
||||
|
||||
SECONDARY_MODE.download_heatmap.inc();
|
||||
|
||||
Ok(serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?)
|
||||
}
|
||||
|
||||
async fn download_timeline(
|
||||
&self,
|
||||
job: &TenantJob,
|
||||
timeline: HeatMapTimeline,
|
||||
) -> anyhow::Result<()> {
|
||||
let timeline_path = self
|
||||
.conf
|
||||
.timeline_path(&job.tenant_id, &timeline.timeline_id);
|
||||
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
|
||||
// Clone a view of what layers already exist on disk
|
||||
let timeline_state = job
|
||||
.secondary_state
|
||||
.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.get(&timeline.timeline_id)
|
||||
.cloned();
|
||||
|
||||
let timeline_state = match timeline_state {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
// We have no existing state: need to scan local disk for layers first.
|
||||
let timeline_state =
|
||||
init_timeline_state(self.conf, &job.tenant_id, &timeline).await?;
|
||||
|
||||
// Re-acquire detail lock now that we're done with async load from local FS
|
||||
job.secondary_state
|
||||
.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(timeline.timeline_id, timeline_state.clone());
|
||||
timeline_state
|
||||
}
|
||||
};
|
||||
|
||||
let layers_in_heatmap = timeline
|
||||
.layers
|
||||
.iter()
|
||||
.map(|l| &l.name)
|
||||
.collect::<HashSet<_>>();
|
||||
let layers_on_disk = timeline_state
|
||||
.on_disk_layers
|
||||
.iter()
|
||||
.map(|l| l.0)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
// Remove on-disk layers that are no longer present in heatmap
|
||||
for layer in layers_on_disk.difference(&layers_in_heatmap) {
|
||||
let local_path = timeline_path.join(layer.to_string());
|
||||
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
|
||||
tokio::fs::remove_file(&local_path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)?;
|
||||
}
|
||||
|
||||
// Download heatmap layers that are not present on local disk, or update their
|
||||
// access time if they are already present.
|
||||
for layer in timeline.layers {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Existing on-disk layers: just update their access time.
|
||||
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
|
||||
tracing::debug!("Layer {} is already on disk", layer.name);
|
||||
if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata)
|
||||
|| on_disk.access_time != layer.access_time
|
||||
{
|
||||
// We already have this layer on disk. Update its access time.
|
||||
tracing::debug!(
|
||||
"Access time updated for layer {}: {} -> {}",
|
||||
layer.name,
|
||||
strftime(&on_disk.access_time),
|
||||
strftime(&layer.access_time)
|
||||
);
|
||||
touched.push(layer);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
tracing::debug!("Layer {} not present on disk yet", layer.name);
|
||||
}
|
||||
|
||||
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
|
||||
// recently than it was evicted.
|
||||
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
|
||||
if &layer.access_time > evicted_at {
|
||||
tracing::info!(
|
||||
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
|
||||
layer.name,
|
||||
strftime(&layer.access_time),
|
||||
strftime(evicted_at)
|
||||
);
|
||||
} else {
|
||||
tracing::trace!(
|
||||
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
|
||||
layer.name,
|
||||
strftime(&layer.access_time),
|
||||
strftime(evicted_at)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match download_layer_file(
|
||||
self.conf,
|
||||
&self.remote_storage,
|
||||
job.tenant_id,
|
||||
timeline.timeline_id,
|
||||
&layer.name,
|
||||
&LayerFileMetadata::from(&layer.metadata),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(downloaded_bytes) => {
|
||||
if downloaded_bytes != layer.metadata.file_size {
|
||||
let local_path = timeline_path.join(layer.name.to_string());
|
||||
|
||||
tracing::error!(
|
||||
"Downloaded layer {} with unexpected size {} != {}",
|
||||
layer.name,
|
||||
downloaded_bytes,
|
||||
layer.metadata.file_size
|
||||
);
|
||||
|
||||
tokio::fs::remove_file(&local_path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)?;
|
||||
}
|
||||
|
||||
SECONDARY_MODE.download_layer.inc();
|
||||
touched.push(layer)
|
||||
}
|
||||
Err(e) => {
|
||||
// No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect
|
||||
// that on some future call to freshen the download will work.
|
||||
// TODO: refine this behavior.
|
||||
tracing::info!("Failed to download layer {}: {}", layer.name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched.
|
||||
{
|
||||
let mut detail = job.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
|
||||
for t in touched {
|
||||
use std::collections::hash_map::Entry;
|
||||
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
|
||||
Entry::Occupied(mut v) => {
|
||||
v.get_mut().access_time = t.access_time;
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
e.insert(OnDiskState::new(
|
||||
self.conf,
|
||||
&job.tenant_id,
|
||||
&timeline.timeline_id,
|
||||
t.name,
|
||||
LayerFileMetadata::from(&t.metadata),
|
||||
t.access_time,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_tenant(&self, job: &TenantJob) -> anyhow::Result<()> {
|
||||
tracing::debug!("Downloading heatmap for secondary tenant {}", job.tenant_id);
|
||||
// Download the tenant's heatmap
|
||||
let heatmap = self.download_heatmap(&job.tenant_id).await?;
|
||||
|
||||
// Save the heatmap: this will be useful on restart, allowing us to reconstruct
|
||||
// layer metadata without having to re-download it.
|
||||
let heatmap_path = self.conf.tenant_path(&job.tenant_id).join(HEATMAP_BASENAME);
|
||||
// TODO: use crashsafe overwrite
|
||||
// TODO: use die-on-io-error helpers
|
||||
tokio::fs::write(
|
||||
&heatmap_path,
|
||||
&serde_json::to_vec(&heatmap).expect("We just deserialized this"),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
tracing::debug!("Wrote heatmap to {}", heatmap_path);
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timeline_id = timeline.timeline_id;
|
||||
self.download_timeline(job, timeline)
|
||||
.instrument(tracing::info_span!(
|
||||
"download_timeline",
|
||||
tenant_id=%job.tenant_id,
|
||||
%timeline_id
|
||||
))
|
||||
.await?;
|
||||
}
|
||||
|
||||
// TODO: remove local disk content for any timelines that don't exist in remote storage
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
|
||||
async fn init_timeline_state(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
heatmap: &HeatMapTimeline,
|
||||
) -> anyhow::Result<SecondaryDetailTimeline> {
|
||||
let timeline_path = conf.timeline_path(tenant_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::default();
|
||||
|
||||
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
tracing::info!("Creating timeline directory {timeline_path}");
|
||||
tokio::fs::create_dir_all(&timeline_path).await?;
|
||||
|
||||
// No entries to report: drop out.
|
||||
return Ok(detail);
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect();
|
||||
|
||||
while let Some(dentry) = dir.next_entry().await? {
|
||||
let dentry_file_name = dentry.file_name();
|
||||
let file_name = dentry_file_name.to_string_lossy();
|
||||
let local_meta = dentry.metadata().await?;
|
||||
|
||||
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
|
||||
if file_name == METADATA_FILE_NAME {
|
||||
continue;
|
||||
}
|
||||
|
||||
match LayerFileName::from_str(&file_name) {
|
||||
Ok(name) => {
|
||||
let remote_meta = heatmap_metadata.get(&name);
|
||||
match remote_meta {
|
||||
Some(remote_meta) => {
|
||||
// TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
|
||||
if local_meta.len() != remote_meta.metadata.file_size {
|
||||
// This should not happen, because we do crashsafe write-then-rename when downloading
|
||||
// layers, and layers in remote storage are immutable. Remove the local file because
|
||||
// we cannot trust it.
|
||||
tracing::warn!(
|
||||
"Removing local layer {name} with unexpected local size {} != {}",
|
||||
local_meta.len(),
|
||||
remote_meta.metadata.file_size
|
||||
);
|
||||
} else {
|
||||
// We expect the access time to be initialized immediately afterwards, when
|
||||
// the latest heatmap is applied to the state.
|
||||
detail.on_disk_layers.insert(
|
||||
name.clone(),
|
||||
OnDiskState::new(
|
||||
conf,
|
||||
tenant_id,
|
||||
&heatmap.timeline_id,
|
||||
name,
|
||||
LayerFileMetadata::from(&remote_meta.metadata),
|
||||
remote_meta.access_time,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// FIXME: consider some optimization when transitioning from attached to secondary: maybe
|
||||
// wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
|
||||
// we will end up deleting any layers which were created+uploaded more recently than the heatmap.
|
||||
tracing::info!(
|
||||
"Removing secondary local layer {} because it's absent in heatmap",
|
||||
name
|
||||
);
|
||||
tokio::fs::remove_file(dentry.path()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// Ignore it.
|
||||
tracing::warn!("Unexpected file in timeline directory: {file_name}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(detail)
|
||||
}
|
||||
57
pageserver/src/tenant/secondary/heatmap.rs
Normal file
57
pageserver/src/tenant/secondary/heatmap.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use std::time::SystemTime;
|
||||
|
||||
use crate::tenant::{
|
||||
remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName,
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(super) struct HeatMapTenant {
|
||||
pub(super) timelines: Vec<HeatMapTimeline>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapLayer {
|
||||
pub(super) name: LayerFileName,
|
||||
pub(super) metadata: IndexLayerMetadata,
|
||||
|
||||
pub(super) access_time: SystemTime,
|
||||
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
}
|
||||
|
||||
impl HeatMapLayer {
|
||||
pub(crate) fn new(
|
||||
name: LayerFileName,
|
||||
metadata: IndexLayerMetadata,
|
||||
access_time: SystemTime,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
metadata,
|
||||
access_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapTimeline {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(super) timeline_id: TimelineId,
|
||||
|
||||
pub(super) layers: Vec<HeatMapLayer>,
|
||||
}
|
||||
|
||||
impl HeatMapTimeline {
|
||||
pub(crate) fn new(timeline_id: TimelineId, layers: Vec<HeatMapLayer>) -> Self {
|
||||
Self {
|
||||
timeline_id,
|
||||
layers,
|
||||
}
|
||||
}
|
||||
}
|
||||
380
pageserver/src/tenant/secondary/heatmap_writer.rs
Normal file
380
pageserver/src/tenant/secondary/heatmap_writer.rs
Normal file
@@ -0,0 +1,380 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Weak},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
mgr::{self, TenantManager},
|
||||
remote_timeline_client::remote_heatmap_path,
|
||||
secondary::CommandResponse,
|
||||
Tenant,
|
||||
},
|
||||
};
|
||||
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::{backoff, completion::Barrier, id::TenantId};
|
||||
|
||||
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
|
||||
|
||||
/// Period between heatmap writer walking Tenants to look for work to do
|
||||
const HEATMAP_WAKE_INTERVAL: Duration = Duration::from_millis(1000);
|
||||
|
||||
/// Periodic between heatmap writes for each Tenant
|
||||
const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
/// While we take a CancellationToken here, it is subordinate to the CancellationTokens
|
||||
/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise
|
||||
/// we might block waiting on a Tenant.
|
||||
pub(super) async fn heatmap_writer_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut writer = HeatmapWriter {
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
cancel: cancel.clone(),
|
||||
tasks: JoinSet::new(),
|
||||
tenants: HashMap::new(),
|
||||
tenants_writing: HashMap::new(),
|
||||
concurrent_writes: 8,
|
||||
};
|
||||
|
||||
tracing::info!("Waiting for background_jobs_can start...");
|
||||
background_jobs_can_start.wait().await;
|
||||
tracing::info!("background_jobs_can is ready, proceeding.");
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
writer.iteration().await?;
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Heatmap writer joining tasks");
|
||||
|
||||
tracing::info!("Heatmap writer terminating");
|
||||
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(HEATMAP_WAKE_INTERVAL) => {},
|
||||
cmd = command_queue.recv() => {
|
||||
let cmd = match cmd {
|
||||
Some(c) =>c,
|
||||
None => {
|
||||
// SecondaryController was destroyed, and this has raced with
|
||||
// our CancellationToken
|
||||
tracing::info!("Heatmap writer terminating");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let CommandRequest{
|
||||
response_tx,
|
||||
payload
|
||||
} = cmd;
|
||||
let result = writer.handle_command(payload).await;
|
||||
if response_tx.send(CommandResponse{result}).is_err() {
|
||||
// Caller went away, e.g. because an HTTP request timed out
|
||||
tracing::info!("Dropping response to administrative command")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct WriteInProgress {
|
||||
barrier: Barrier,
|
||||
}
|
||||
|
||||
struct WriteComplete {
|
||||
tenant_id: TenantId,
|
||||
completed_at: Instant,
|
||||
}
|
||||
|
||||
/// The heatmap writer keeps a little bit of per-tenant state, mainly to remember
|
||||
/// when we last did a write. We only populate this after doing at least one
|
||||
/// write for a tenant -- this avoids holding state for tenants that have
|
||||
/// uploads disabled.
|
||||
|
||||
struct WriterTenantState {
|
||||
// This Weak only exists to enable culling IdleTenant instances
|
||||
// when the Tenant has been deallocated.
|
||||
tenant: Weak<Tenant>,
|
||||
|
||||
last_write: Option<Instant>,
|
||||
}
|
||||
|
||||
struct HeatmapWriter {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
|
||||
tenants: HashMap<TenantId, WriterTenantState>,
|
||||
|
||||
tenants_writing: HashMap<TenantId, WriteInProgress>,
|
||||
tasks: JoinSet<WriteComplete>,
|
||||
concurrent_writes: usize,
|
||||
}
|
||||
|
||||
impl HeatmapWriter {
|
||||
/// Periodic execution phase: check for new work to do, and run it with `spawn_write`
|
||||
async fn iteration(&mut self) -> anyhow::Result<()> {
|
||||
self.drain().await;
|
||||
|
||||
// Cull any entries in self.tenants whose Arc<Tenant> is gone
|
||||
self.tenants.retain(|_k, v| v.tenant.upgrade().is_some());
|
||||
|
||||
// Cannot spawn more work right now
|
||||
if self.tenants_writing.len() >= self.concurrent_writes {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Iterate over tenants looking for work to do.
|
||||
let tenants = self.tenant_manager.get_attached_tenants();
|
||||
for tenant in tenants {
|
||||
// Can't spawn any more work, drop out
|
||||
if self.tenants_writing.len() >= self.concurrent_writes {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Process is shutting down, drop out
|
||||
if self.cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Skip tenants that don't have heatmaps enabled
|
||||
if !tenant.get_enable_heatmap() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip tenants that aren't in a stable active state
|
||||
if tenant.current_state() != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip tenants that already have a write in flight
|
||||
if self.tenants_writing.contains_key(&tenant.get_tenant_id()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: add a TenantConf for whether to upload at all. This is useful for
|
||||
// a single-location mode for cheap tenants that don't require HA.
|
||||
|
||||
// TODO: add a mechanism to check whether the active layer set has
|
||||
// changed since our last write
|
||||
|
||||
self.maybe_spawn_write(tenant);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drain(&mut self) {
|
||||
// Drain any complete background operations
|
||||
loop {
|
||||
tokio::select!(
|
||||
biased;
|
||||
Some(r) = self.tasks.join_next() => {
|
||||
match r {
|
||||
Ok(r) => {
|
||||
self.on_completion(r);
|
||||
},
|
||||
Err(e) => {
|
||||
// This should not happen, but needn't be fatal.
|
||||
tracing::error!("Join error on heatmap writer JoinSet! {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
else => {
|
||||
break;
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_spawn_write(&mut self, tenant: Arc<Tenant>) {
|
||||
// Create an entry in self.tenants if one doesn't already exist: this will later be updated
|
||||
// with the completion time in on_completion.
|
||||
let state = self
|
||||
.tenants
|
||||
.entry(tenant.get_tenant_id())
|
||||
.or_insert_with(|| WriterTenantState {
|
||||
tenant: Arc::downgrade(&tenant),
|
||||
last_write: None,
|
||||
});
|
||||
|
||||
// Decline to do the upload if insufficient time has passed
|
||||
if let Some(last_write) = state.last_write {
|
||||
if Instant::now().duration_since(last_write) < HEATMAP_UPLOAD_INTERVAL {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.spawn_write(tenant)
|
||||
}
|
||||
|
||||
fn spawn_write(&mut self, tenant: Arc<Tenant>) {
|
||||
let remote_storage = self.remote_storage.clone();
|
||||
let tenant_id = tenant.get_tenant_id();
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
self.tasks.spawn(async move {
|
||||
// Guard for the barrier in [`WriteInProgress`]
|
||||
let _completion = completion;
|
||||
|
||||
match write_tenant(remote_storage, &tenant)
|
||||
.instrument(tracing::info_span!(
|
||||
"write_tenant",
|
||||
tenant_id = %tenant.get_tenant_id()
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Failed to upload heatmap for tenant {}: {e:#}",
|
||||
tenant.get_tenant_id(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
WriteComplete {
|
||||
tenant_id: tenant.get_tenant_id(),
|
||||
completed_at: Instant::now(),
|
||||
}
|
||||
});
|
||||
|
||||
self.tenants_writing
|
||||
.insert(tenant_id, WriteInProgress { barrier });
|
||||
}
|
||||
|
||||
fn on_completion(&mut self, completion: WriteComplete) {
|
||||
tracing::debug!(tenant_id=%completion.tenant_id, "Heatmap write task complete");
|
||||
self.tenants_writing.remove(&completion.tenant_id);
|
||||
tracing::debug!("Task completed for tenant {}", completion.tenant_id);
|
||||
use std::collections::hash_map::Entry;
|
||||
match self.tenants.entry(completion.tenant_id) {
|
||||
Entry::Vacant(_) => {
|
||||
// Tenant state was dropped, nothing to update.
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().last_write = Some(completion.completed_at)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_command(&mut self, command: UploadCommand) -> anyhow::Result<()> {
|
||||
match command {
|
||||
UploadCommand::Upload(tenant_id) => {
|
||||
// If an upload was ongoing for this tenant, let it finish first.
|
||||
if let Some(writing_state) = self.tenants_writing.get(&tenant_id) {
|
||||
tracing::info!(%tenant_id, "Waiting for heatmap write to complete");
|
||||
writing_state.barrier.clone().wait().await;
|
||||
}
|
||||
|
||||
// Spawn the upload then immediately wait for it. This will block processing of other commands and
|
||||
// starting of other background work.
|
||||
tracing::info!(%tenant_id, "Starting heatmap write on command");
|
||||
let tenant = mgr::get_tenant(tenant_id, true)?;
|
||||
self.spawn_write(tenant);
|
||||
let writing_state = self
|
||||
.tenants_writing
|
||||
.get(&tenant_id)
|
||||
.expect("We just inserted this");
|
||||
tracing::info!(%tenant_id, "Waiting for heatmap write to complete");
|
||||
writing_state.barrier.clone().wait().await;
|
||||
tracing::info!(%tenant_id, "Heatmap write complete");
|
||||
|
||||
// This drain is not necessary for correctness, but it is polite to avoid intentionally leaving
|
||||
// our complete task in self.tenants_writing.
|
||||
self.drain().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_tenant(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut heatmap = HeatMapTenant {
|
||||
timelines: Vec::new(),
|
||||
};
|
||||
let timelines = tenant.timelines.lock().unwrap().clone();
|
||||
|
||||
let tenant_cancel = tenant.cancel.clone();
|
||||
|
||||
// Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
|
||||
// when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
|
||||
// in remote storage.
|
||||
let _guard = match tenant.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
tracing::info!("Skipping heatmap upload for tenant which is shutting down");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
for (timeline_id, timeline) in timelines {
|
||||
let heatmap_timeline = timeline.generate_heatmap().await;
|
||||
match heatmap_timeline {
|
||||
None => {
|
||||
tracing::debug!(
|
||||
"Skipping heatmap upload because timeline {timeline_id} is not ready"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Some(heatmap_timeline) => {
|
||||
heatmap.timelines.push(heatmap_timeline);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize the heatmap
|
||||
let bytes = serde_json::to_vec(&heatmap)?;
|
||||
let size = bytes.len();
|
||||
|
||||
let path = remote_heatmap_path(&tenant.get_tenant_id());
|
||||
|
||||
// Write the heatmap.
|
||||
tracing::debug!("Uploading {size} byte heatmap to {path}");
|
||||
if let Err(e) = backoff::retry(
|
||||
|| async {
|
||||
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone()));
|
||||
let bytes = Box::new(bytes);
|
||||
remote_storage
|
||||
.upload_storage_object(bytes, size, &path)
|
||||
.await
|
||||
},
|
||||
|_| false,
|
||||
3,
|
||||
u32::MAX,
|
||||
"Uploading heatmap",
|
||||
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
|
||||
)
|
||||
.await
|
||||
{
|
||||
if tenant_cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
SECONDARY_MODE.upload_heatmap.inc();
|
||||
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user