pageserver: do fewer heatmap uploads for tiny tenants (#7731)

## Problem

Currently we do a large number of heatmap uploads for tiny tenants.
"tiny" in this context is defined as being less than a single layer in
size. These uploads are triggered by atime changes rather than changes
in the set of layers.

Uploading heatmaps for atime changes on small tenants isn't useful,
because even without bumping these atimes, disk usage eviction still
avoids evicting the largest resident layer of a tenant, which in
practice keeps tiny/empty tenants mostly resident irrespective of
atimes.

## Summary of changes

- For tenants smaller than one checkpoint interval, only upload heatmap
if the set of layers has changed, not if only the atimes have changed.
- Include the heatmap period in the uploaded heatmap, as a precursor to
implementing https://github.com/neondatabase/neon/issues/6200
(auto-adjusting download intervals to match upload intervals)
This commit is contained in:
John Spray
2024-05-14 09:31:26 +01:00
committed by GitHub
parent 22afaea6e1
commit cd0e344938
2 changed files with 81 additions and 26 deletions

View File

@@ -15,6 +15,14 @@ pub(super) struct HeatMapTenant {
pub(super) generation: Generation,
pub(super) timelines: Vec<HeatMapTimeline>,
/// Uploaders provide their own upload period in the heatmap, as a hint to downloaders
/// of how frequently it is worthwhile to check for updates.
///
/// This is optional for backward compat, and because we sometimes might upload
/// a heatmap explicitly via API for a tenant that has no periodic upload configured.
#[serde(default)]
pub(super) upload_period_ms: Option<u128>,
}
#[serde_as]
@@ -81,4 +89,21 @@ impl HeatMapTenant {
stats
}
pub(crate) fn strip_atimes(self) -> Self {
Self {
timelines: self
.timelines
.into_iter()
.map(|mut tl| {
for layer in &mut tl.layers {
layer.access_time = SystemTime::UNIX_EPOCH;
}
tl
})
.collect(),
generation: self.generation,
upload_period_ms: self.upload_period_ms,
}
}
}

View File

@@ -80,7 +80,7 @@ impl RunningJob for WriteInProgress {
struct UploadPending {
tenant: Arc<Tenant>,
last_digest: Option<md5::Digest>,
last_upload: Option<LastUploadState>,
target_time: Option<Instant>,
period: Option<Duration>,
}
@@ -94,7 +94,7 @@ impl scheduler::PendingJob for UploadPending {
struct WriteComplete {
tenant_shard_id: TenantShardId,
completed_at: Instant,
digest: Option<md5::Digest>,
uploaded: Option<LastUploadState>,
next_upload: Option<Instant>,
}
@@ -115,10 +115,7 @@ struct UploaderTenantState {
tenant: Weak<Tenant>,
/// Digest of the serialized heatmap that we last successfully uploaded
///
/// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
/// which is also an md5sum.
last_digest: Option<md5::Digest>,
last_upload_state: Option<LastUploadState>,
/// When the last upload attempt completed (may have been successful or failed)
last_upload: Option<Instant>,
@@ -187,7 +184,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
tenant: Arc::downgrade(&tenant),
last_upload: None,
next_upload: Some(now.checked_add(period_warmup(period)).unwrap_or(now)),
last_digest: None,
last_upload_state: None,
});
// Decline to do the upload if insufficient time has passed
@@ -195,10 +192,10 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
return;
}
let last_digest = state.last_digest;
let last_upload = state.last_upload_state.clone();
result.jobs.push(UploadPending {
tenant,
last_digest,
last_upload,
target_time: state.next_upload,
period: Some(period),
});
@@ -218,7 +215,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
) {
let UploadPending {
tenant,
last_digest,
last_upload,
target_time,
period,
} = job;
@@ -231,16 +228,16 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
let _completion = completion;
let started_at = Instant::now();
let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await {
Ok(UploadHeatmapOutcome::Uploaded(digest)) => {
let uploaded = match upload_tenant_heatmap(remote_storage, &tenant, last_upload.clone()).await {
Ok(UploadHeatmapOutcome::Uploaded(uploaded)) => {
let duration = Instant::now().duration_since(started_at);
SECONDARY_MODE
.upload_heatmap_duration
.observe(duration.as_secs_f64());
SECONDARY_MODE.upload_heatmap.inc();
Some(digest)
Some(uploaded)
}
Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest,
Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_upload,
Err(UploadHeatmapError::Upload(e)) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
@@ -251,11 +248,11 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
.upload_heatmap_duration
.observe(duration.as_secs_f64());
SECONDARY_MODE.upload_heatmap_errors.inc();
last_digest
last_upload
}
Err(UploadHeatmapError::Cancelled) => {
tracing::info!("Cancelled heatmap upload, shutting down");
last_digest
last_upload
}
};
@@ -277,7 +274,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
WriteComplete {
tenant_shard_id: *tenant.get_tenant_shard_id(),
completed_at: now,
digest,
uploaded,
next_upload,
}
}.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
@@ -299,7 +296,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
Ok(UploadPending {
// Ignore our state for last digest: this forces an upload even if nothing has changed
last_digest: None,
last_upload: None,
tenant,
target_time: None,
period: None,
@@ -312,7 +309,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
let WriteComplete {
tenant_shard_id,
completed_at,
digest,
uploaded,
next_upload,
} = completion;
use std::collections::hash_map::Entry;
@@ -322,7 +319,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
}
Entry::Occupied(mut entry) => {
entry.get_mut().last_upload = Some(completed_at);
entry.get_mut().last_digest = digest;
entry.get_mut().last_upload_state = uploaded;
entry.get_mut().next_upload = next_upload
}
}
@@ -331,7 +328,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
enum UploadHeatmapOutcome {
/// We successfully wrote to remote storage, with this digest.
Uploaded(md5::Digest),
Uploaded(LastUploadState),
/// We did not upload because the heatmap digest was unchanged since the last upload
NoChange,
/// We skipped the upload for some reason, such as tenant/timeline not ready
@@ -347,12 +344,25 @@ enum UploadHeatmapError {
Upload(#[from] anyhow::Error),
}
/// Digests describing the heatmap we most recently uploaded successfully.
///
/// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
/// which is also an md5sum.
#[derive(Clone)]
struct LastUploadState {
// Digest of json-encoded HeatMapTenant
uploaded_digest: md5::Digest,
// Digest without atimes set.
layers_only_digest: md5::Digest,
}
/// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
/// of the object we would have uploaded.
async fn upload_tenant_heatmap(
remote_storage: GenericRemoteStorage,
tenant: &Arc<Tenant>,
last_digest: Option<md5::Digest>,
last_upload: Option<LastUploadState>,
) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
debug_assert_current_span_has_tenant_id();
@@ -368,6 +378,7 @@ async fn upload_tenant_heatmap(
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
generation,
upload_period_ms: tenant.get_heatmap_period().map(|p| p.as_millis()),
};
let timelines = tenant.timelines.lock().unwrap().clone();
@@ -396,15 +407,31 @@ async fn upload_tenant_heatmap(
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
let bytes = bytes::Bytes::from(bytes);
let size = bytes.len();
// Drop out early if nothing changed since our last upload
let digest = md5::compute(&bytes);
if Some(digest) == last_digest {
if Some(&digest) == last_upload.as_ref().map(|d| &d.uploaded_digest) {
return Ok(UploadHeatmapOutcome::NoChange);
}
// Calculate a digest that omits atimes, so that we can distinguish actual changes in
// layers from changes only in atimes.
let heatmap_size_bytes = heatmap.get_stats().bytes;
let layers_only_bytes =
serde_json::to_vec(&heatmap.strip_atimes()).map_err(|e| anyhow::anyhow!(e))?;
let layers_only_digest = md5::compute(&layers_only_bytes);
if heatmap_size_bytes < tenant.get_checkpoint_distance() {
// For small tenants, skip upload if only atimes changed. This avoids doing frequent
// uploads from long-idle tenants whose atimes are just incremented by periodic
// size calculations.
if Some(&layers_only_digest) == last_upload.as_ref().map(|d| &d.layers_only_digest) {
return Ok(UploadHeatmapOutcome::NoChange);
}
}
let bytes = bytes::Bytes::from(bytes);
let size = bytes.len();
let path = remote_heatmap_path(tenant.get_tenant_shard_id());
let cancel = &tenant.cancel;
@@ -436,5 +463,8 @@ async fn upload_tenant_heatmap(
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
Ok(UploadHeatmapOutcome::Uploaded(digest))
Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {
uploaded_digest: digest,
layers_only_digest,
}))
}