mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
pageserver: TenantManager support for SecondaryTenant
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
use pageserver_api::models::*;
|
||||
use pageserver_api::{models::*, shard::TenantShardId};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use utils::{
|
||||
http::error::HttpErrorBody,
|
||||
@@ -162,6 +162,18 @@ impl Client {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/secondary/download",
|
||||
self.mgmt_api_endpoint, tenant_id
|
||||
);
|
||||
self.request(Method::POST, &uri, ())
|
||||
.await?
|
||||
.error_for_status()
|
||||
.map(|_| ())
|
||||
.map_err(|e| Error::ApiError(format!("{}", e)))
|
||||
}
|
||||
|
||||
pub async fn location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -585,7 +585,7 @@ impl DeleteTenantFlow {
|
||||
}
|
||||
break;
|
||||
}
|
||||
TenantsMapRemoveResult::Occupied(TenantSlot::Secondary) => {
|
||||
TenantsMapRemoveResult::Occupied(TenantSlot::Secondary(_)) => {
|
||||
// This is unexpected: this secondary tenants should not have been created, and we
|
||||
// are not in a position to shut it down from here.
|
||||
tracing::warn!("Tenant transitioned to secondary mode while deleting!");
|
||||
|
||||
@@ -44,6 +44,8 @@ use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::storage_layer::Layer;
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
@@ -57,7 +59,7 @@ use super::TenantSharedResources;
|
||||
/// having a properly acquired generation (Secondary doesn't need a generation)
|
||||
pub(crate) enum TenantSlot {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary,
|
||||
Secondary(Arc<SecondaryTenant>),
|
||||
/// In this state, other administrative operations acting on the TenantId should
|
||||
/// block, or return a retry indicator equivalent to HTTP 503.
|
||||
InProgress(utils::completion::Barrier),
|
||||
@@ -67,7 +69,7 @@ impl std::fmt::Debug for TenantSlot {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
|
||||
Self::Secondary => write!(f, "Secondary"),
|
||||
Self::Secondary(_) => write!(f, "Secondary"),
|
||||
Self::InProgress(_) => write!(f, "InProgress"),
|
||||
}
|
||||
}
|
||||
@@ -78,7 +80,7 @@ impl TenantSlot {
|
||||
fn get_attached(&self) -> Option<&Arc<Tenant>> {
|
||||
match self {
|
||||
Self::Attached(t) => Some(t),
|
||||
Self::Secondary => None,
|
||||
Self::Secondary(_) => None,
|
||||
Self::InProgress(_) => None,
|
||||
}
|
||||
}
|
||||
@@ -469,7 +471,10 @@ pub async fn init_tenant_mgr(
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
|
||||
tenants.insert(tenant_shard_id, TenantSlot::Secondary);
|
||||
tenants.insert(
|
||||
tenant_shard_id,
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id)),
|
||||
);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
// TODO: augment re-attach API to enable the control plane to
|
||||
@@ -664,8 +669,14 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
|
||||
total_attached += 1;
|
||||
}
|
||||
TenantSlot::Secondary => {
|
||||
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary);
|
||||
TenantSlot::Secondary(state) => {
|
||||
// We don't need to wait for this individually per-tenant: the
|
||||
// downloader task will be waited on eventually, this cancel
|
||||
// is just to encourage it to drop out if it is doing work
|
||||
// for this tenant right now.
|
||||
state.cancel.cancel();
|
||||
|
||||
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
|
||||
}
|
||||
TenantSlot::InProgress(notify) => {
|
||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||
@@ -848,12 +859,28 @@ impl TenantManager {
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
|
||||
}
|
||||
None | Some(TenantSlot::Secondary) => {
|
||||
None | Some(TenantSlot::Secondary(_)) => {
|
||||
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_secondary_tenant_shard(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Option<Arc<SecondaryTenant>> {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
match peek_slot {
|
||||
Some(TenantSlot::Secondary(s)) => Some(s.clone()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
@@ -932,42 +959,57 @@ impl TenantManager {
|
||||
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
|
||||
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
|
||||
// The case where we keep a Tenant alive was covered above in the special case
|
||||
// for Attached->Attached transitions in the same generation. By this point,
|
||||
// if we see an attached tenant we know it will be discarded and should be
|
||||
// shut down.
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// The case where we keep a Tenant alive was covered above in the special case
|
||||
// for Attached->Attached transitions in the same generation. By this point,
|
||||
// if we see an attached tenant we know it will be discarded and should be
|
||||
// shut down.
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
self.resources.deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
self.resources.deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
barrier.wait().await;
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
barrier.wait().await;
|
||||
}
|
||||
}
|
||||
slot_guard.drop_old_value().expect("We just shut it down");
|
||||
}
|
||||
Some(TenantSlot::Secondary(state)) => {
|
||||
info!("Shutting down secondary tenant");
|
||||
state.shutdown().await;
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// This should never happen: acquire_slot should error out
|
||||
// if the contents of a slot were InProgress.
|
||||
anyhow::bail!("Acquired an InProgress slot, this is a bug.")
|
||||
}
|
||||
None => {
|
||||
// Slot was vacant, nothing needs shutting down.
|
||||
}
|
||||
slot_guard.drop_old_value().expect("We just shut it down");
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
tokio::fs::create_dir_all(&timelines_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
@@ -975,11 +1017,9 @@ impl TenantManager {
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
TenantSlot::Secondary
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id))
|
||||
}
|
||||
LocationMode::Attached(_attach_config) => {
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
@@ -1102,6 +1142,80 @@ impl TenantManager {
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
// Do some synchronous work for all tenant slots in Secondary state. The provided
|
||||
// callback should be small and fast, as it will be called inside the global
|
||||
// TenantsMap lock.
|
||||
pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
|
||||
where
|
||||
// TODO: let the callback return a hint to drop out of the loop early
|
||||
F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
|
||||
{
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let map = match &*locked {
|
||||
TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
for (tenant_id, slot) in map {
|
||||
if let TenantSlot::Secondary(state) = slot {
|
||||
// Only expose secondary tenants that are not currently shutting down
|
||||
if !state.cancel.is_cancelled() {
|
||||
func(tenant_id, state)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Having planned some evictions for a tenant, attempt to execute them.
|
||||
///
|
||||
/// Execution will not occur if the TenantSlot for this tenant is not in
|
||||
/// a state suitable to execute.
|
||||
// TODO: is Layer really needed here? Maybe we should have reduced to a LayerFileName by this point.
|
||||
pub(crate) async fn evict_tenant_layers(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_layers: Vec<(TimelineId, Layer)>,
|
||||
) {
|
||||
// TODO: unify with how we evict for attached tenants. They should also
|
||||
// pass through here, to avoid attached tenant evictions racing with
|
||||
// the lifetime of secondary locations for the same tenant ID.
|
||||
|
||||
let state = {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
let map = match &*locked {
|
||||
TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
match map.get(tenant_shard_id) {
|
||||
Some(TenantSlot::Secondary(secondary_state)) => {
|
||||
// Found a secondary as expected
|
||||
secondary_state.clone()
|
||||
}
|
||||
_ => {
|
||||
// A location configuration change raced with this eviction
|
||||
tracing::info!(
|
||||
"Dropping {} layer evictions, tenant not in suitable state",
|
||||
timeline_layers.len()
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Concurrency: downloads might have been going on while we deleted layers. However,
|
||||
// we are only deleting layers that the SecondaryTenant already thought were on disk,
|
||||
// so we won't be deleting anything that it is _currently_ downloading. All deletions
|
||||
// of SecondaryTenant layers flow through this function, so there is no risk that the
|
||||
// layer we're evicting is no longer present in-memory.
|
||||
state
|
||||
.evict_layers(self.conf, tenant_shard_id, timeline_layers)
|
||||
.instrument(tracing::info_span!("evict_layers",
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -1151,7 +1265,7 @@ pub(crate) fn get_tenant(
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
|
||||
}
|
||||
None | Some(TenantSlot::Secondary) => {
|
||||
None | Some(TenantSlot::Secondary(_)) => {
|
||||
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
|
||||
}
|
||||
}
|
||||
@@ -1222,7 +1336,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary) => {
|
||||
Some(TenantSlot::Secondary(_)) => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
|
||||
tenant_id,
|
||||
)))
|
||||
@@ -1521,7 +1635,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>,
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::Secondary(_) => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
.collect())
|
||||
@@ -1778,11 +1892,7 @@ impl SlotGuard {
|
||||
fn old_value_is_shutdown(&self) -> bool {
|
||||
match self.old_value.as_ref() {
|
||||
Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
|
||||
Some(TenantSlot::Secondary) => {
|
||||
// TODO: when adding secondary mode tenants, this will check for shutdown
|
||||
// in the same way that we do for `Tenant` above
|
||||
true
|
||||
}
|
||||
Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// A SlotGuard cannot be constructed for a slot that was already InProgress
|
||||
unreachable!()
|
||||
@@ -1992,26 +2102,19 @@ where
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(t)) => Some(t),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// allow pageserver shutdown to await for our completion
|
||||
let (_guard, progress) = completion::channel();
|
||||
|
||||
// If the tenant was attached, shut it down gracefully. For secondary
|
||||
// locations this part is not necessary
|
||||
match &attached_tenant {
|
||||
Some(attached_tenant) => {
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
|
||||
let freeze_and_flush = false;
|
||||
|
||||
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
|
||||
// that we can continue safely to cleanup.
|
||||
match attached_tenant.shutdown(progress, freeze_and_flush).await {
|
||||
match tenant.shutdown(progress, freeze_and_flush).await {
|
||||
Ok(()) => {}
|
||||
Err(_other) => {
|
||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||
@@ -2020,11 +2123,19 @@ where
|
||||
return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
|
||||
}
|
||||
}
|
||||
Some(tenant)
|
||||
}
|
||||
None => {
|
||||
// Nothing to wait on when not attached, proceed.
|
||||
Some(TenantSlot::Secondary(secondary_state)) => {
|
||||
tracing::info!("Shutting down in secondary mode");
|
||||
secondary_state.shutdown().await;
|
||||
None
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// Acquiring a slot guarantees its old value was not InProgress
|
||||
unreachable!();
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
match tenant_cleanup
|
||||
.await
|
||||
|
||||
@@ -1,380 +0,0 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Weak},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
mgr::{self, TenantManager},
|
||||
remote_timeline_client::remote_heatmap_path,
|
||||
secondary::CommandResponse,
|
||||
Tenant,
|
||||
},
|
||||
};
|
||||
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::{backoff, completion::Barrier, id::TenantId};
|
||||
|
||||
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
|
||||
|
||||
/// Period between heatmap writer walking Tenants to look for work to do
|
||||
const HEATMAP_WAKE_INTERVAL: Duration = Duration::from_millis(1000);
|
||||
|
||||
/// Periodic between heatmap writes for each Tenant
|
||||
const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
/// While we take a CancellationToken here, it is subordinate to the CancellationTokens
|
||||
/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise
|
||||
/// we might block waiting on a Tenant.
|
||||
pub(super) async fn heatmap_writer_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut writer = HeatmapWriter {
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
cancel: cancel.clone(),
|
||||
tasks: JoinSet::new(),
|
||||
tenants: HashMap::new(),
|
||||
tenants_writing: HashMap::new(),
|
||||
concurrent_writes: 8,
|
||||
};
|
||||
|
||||
tracing::info!("Waiting for background_jobs_can start...");
|
||||
background_jobs_can_start.wait().await;
|
||||
tracing::info!("background_jobs_can is ready, proceeding.");
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
writer.iteration().await?;
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Heatmap writer joining tasks");
|
||||
|
||||
tracing::info!("Heatmap writer terminating");
|
||||
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(HEATMAP_WAKE_INTERVAL) => {},
|
||||
cmd = command_queue.recv() => {
|
||||
let cmd = match cmd {
|
||||
Some(c) =>c,
|
||||
None => {
|
||||
// SecondaryController was destroyed, and this has raced with
|
||||
// our CancellationToken
|
||||
tracing::info!("Heatmap writer terminating");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let CommandRequest{
|
||||
response_tx,
|
||||
payload
|
||||
} = cmd;
|
||||
let result = writer.handle_command(payload).await;
|
||||
if response_tx.send(CommandResponse{result}).is_err() {
|
||||
// Caller went away, e.g. because an HTTP request timed out
|
||||
tracing::info!("Dropping response to administrative command")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct WriteInProgress {
|
||||
barrier: Barrier,
|
||||
}
|
||||
|
||||
struct WriteComplete {
|
||||
tenant_id: TenantId,
|
||||
completed_at: Instant,
|
||||
}
|
||||
|
||||
/// The heatmap writer keeps a little bit of per-tenant state, mainly to remember
|
||||
/// when we last did a write. We only populate this after doing at least one
|
||||
/// write for a tenant -- this avoids holding state for tenants that have
|
||||
/// uploads disabled.
|
||||
|
||||
struct WriterTenantState {
|
||||
// This Weak only exists to enable culling IdleTenant instances
|
||||
// when the Tenant has been deallocated.
|
||||
tenant: Weak<Tenant>,
|
||||
|
||||
last_write: Option<Instant>,
|
||||
}
|
||||
|
||||
struct HeatmapWriter {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
|
||||
tenants: HashMap<TenantId, WriterTenantState>,
|
||||
|
||||
tenants_writing: HashMap<TenantId, WriteInProgress>,
|
||||
tasks: JoinSet<WriteComplete>,
|
||||
concurrent_writes: usize,
|
||||
}
|
||||
|
||||
impl HeatmapWriter {
|
||||
/// Periodic execution phase: check for new work to do, and run it with `spawn_write`
|
||||
async fn iteration(&mut self) -> anyhow::Result<()> {
|
||||
self.drain().await;
|
||||
|
||||
// Cull any entries in self.tenants whose Arc<Tenant> is gone
|
||||
self.tenants.retain(|_k, v| v.tenant.upgrade().is_some());
|
||||
|
||||
// Cannot spawn more work right now
|
||||
if self.tenants_writing.len() >= self.concurrent_writes {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Iterate over tenants looking for work to do.
|
||||
let tenants = self.tenant_manager.get_attached_tenants();
|
||||
for tenant in tenants {
|
||||
// Can't spawn any more work, drop out
|
||||
if self.tenants_writing.len() >= self.concurrent_writes {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Process is shutting down, drop out
|
||||
if self.cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Skip tenants that don't have heatmaps enabled
|
||||
if !tenant.get_enable_heatmap() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip tenants that aren't in a stable active state
|
||||
if tenant.current_state() != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip tenants that already have a write in flight
|
||||
if self.tenants_writing.contains_key(&tenant.get_tenant_id()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: add a TenantConf for whether to upload at all. This is useful for
|
||||
// a single-location mode for cheap tenants that don't require HA.
|
||||
|
||||
// TODO: add a mechanism to check whether the active layer set has
|
||||
// changed since our last write
|
||||
|
||||
self.maybe_spawn_write(tenant);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drain(&mut self) {
|
||||
// Drain any complete background operations
|
||||
loop {
|
||||
tokio::select!(
|
||||
biased;
|
||||
Some(r) = self.tasks.join_next() => {
|
||||
match r {
|
||||
Ok(r) => {
|
||||
self.on_completion(r);
|
||||
},
|
||||
Err(e) => {
|
||||
// This should not happen, but needn't be fatal.
|
||||
tracing::error!("Join error on heatmap writer JoinSet! {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
else => {
|
||||
break;
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_spawn_write(&mut self, tenant: Arc<Tenant>) {
|
||||
// Create an entry in self.tenants if one doesn't already exist: this will later be updated
|
||||
// with the completion time in on_completion.
|
||||
let state = self
|
||||
.tenants
|
||||
.entry(tenant.get_tenant_id())
|
||||
.or_insert_with(|| WriterTenantState {
|
||||
tenant: Arc::downgrade(&tenant),
|
||||
last_write: None,
|
||||
});
|
||||
|
||||
// Decline to do the upload if insufficient time has passed
|
||||
if let Some(last_write) = state.last_write {
|
||||
if Instant::now().duration_since(last_write) < HEATMAP_UPLOAD_INTERVAL {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.spawn_write(tenant)
|
||||
}
|
||||
|
||||
fn spawn_write(&mut self, tenant: Arc<Tenant>) {
|
||||
let remote_storage = self.remote_storage.clone();
|
||||
let tenant_id = tenant.get_tenant_id();
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
self.tasks.spawn(async move {
|
||||
// Guard for the barrier in [`WriteInProgress`]
|
||||
let _completion = completion;
|
||||
|
||||
match write_tenant(remote_storage, &tenant)
|
||||
.instrument(tracing::info_span!(
|
||||
"write_tenant",
|
||||
tenant_id = %tenant.get_tenant_id()
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Failed to upload heatmap for tenant {}: {e:#}",
|
||||
tenant.get_tenant_id(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
WriteComplete {
|
||||
tenant_id: tenant.get_tenant_id(),
|
||||
completed_at: Instant::now(),
|
||||
}
|
||||
});
|
||||
|
||||
self.tenants_writing
|
||||
.insert(tenant_id, WriteInProgress { barrier });
|
||||
}
|
||||
|
||||
fn on_completion(&mut self, completion: WriteComplete) {
|
||||
tracing::debug!(tenant_id=%completion.tenant_id, "Heatmap write task complete");
|
||||
self.tenants_writing.remove(&completion.tenant_id);
|
||||
tracing::debug!("Task completed for tenant {}", completion.tenant_id);
|
||||
use std::collections::hash_map::Entry;
|
||||
match self.tenants.entry(completion.tenant_id) {
|
||||
Entry::Vacant(_) => {
|
||||
// Tenant state was dropped, nothing to update.
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().last_write = Some(completion.completed_at)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_command(&mut self, command: UploadCommand) -> anyhow::Result<()> {
|
||||
match command {
|
||||
UploadCommand::Upload(tenant_id) => {
|
||||
// If an upload was ongoing for this tenant, let it finish first.
|
||||
if let Some(writing_state) = self.tenants_writing.get(&tenant_id) {
|
||||
tracing::info!(%tenant_id, "Waiting for heatmap write to complete");
|
||||
writing_state.barrier.clone().wait().await;
|
||||
}
|
||||
|
||||
// Spawn the upload then immediately wait for it. This will block processing of other commands and
|
||||
// starting of other background work.
|
||||
tracing::info!(%tenant_id, "Starting heatmap write on command");
|
||||
let tenant = mgr::get_tenant(tenant_id, true)?;
|
||||
self.spawn_write(tenant);
|
||||
let writing_state = self
|
||||
.tenants_writing
|
||||
.get(&tenant_id)
|
||||
.expect("We just inserted this");
|
||||
tracing::info!(%tenant_id, "Waiting for heatmap write to complete");
|
||||
writing_state.barrier.clone().wait().await;
|
||||
tracing::info!(%tenant_id, "Heatmap write complete");
|
||||
|
||||
// This drain is not necessary for correctness, but it is polite to avoid intentionally leaving
|
||||
// our complete task in self.tenants_writing.
|
||||
self.drain().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_tenant(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut heatmap = HeatMapTenant {
|
||||
timelines: Vec::new(),
|
||||
};
|
||||
let timelines = tenant.timelines.lock().unwrap().clone();
|
||||
|
||||
let tenant_cancel = tenant.cancel.clone();
|
||||
|
||||
// Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
|
||||
// when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
|
||||
// in remote storage.
|
||||
let _guard = match tenant.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
tracing::info!("Skipping heatmap upload for tenant which is shutting down");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
for (timeline_id, timeline) in timelines {
|
||||
let heatmap_timeline = timeline.generate_heatmap().await;
|
||||
match heatmap_timeline {
|
||||
None => {
|
||||
tracing::debug!(
|
||||
"Skipping heatmap upload because timeline {timeline_id} is not ready"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Some(heatmap_timeline) => {
|
||||
heatmap.timelines.push(heatmap_timeline);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize the heatmap
|
||||
let bytes = serde_json::to_vec(&heatmap)?;
|
||||
let size = bytes.len();
|
||||
|
||||
let path = remote_heatmap_path(&tenant.get_tenant_id());
|
||||
|
||||
// Write the heatmap.
|
||||
tracing::debug!("Uploading {size} byte heatmap to {path}");
|
||||
if let Err(e) = backoff::retry(
|
||||
|| async {
|
||||
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone()));
|
||||
let bytes = Box::new(bytes);
|
||||
remote_storage
|
||||
.upload_storage_object(bytes, size, &path)
|
||||
.await
|
||||
},
|
||||
|_| false,
|
||||
3,
|
||||
u32::MAX,
|
||||
"Uploading heatmap",
|
||||
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
|
||||
)
|
||||
.await
|
||||
{
|
||||
if tenant_cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
SECONDARY_MODE.upload_heatmap.inc();
|
||||
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user