mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
pageserver: mark unarchival heatmap layers as cold (#11098)
## Problem On unarchival, we update the previous heatmap with all visible layers. When the primary generates a new heatmap it includes all those layers, so the secondary will download them. Since they're not actually resident on the primary (we didn't call the warm up API), they'll never be evicted, so they remain in the heatmap. We want these layers in the heatmap, since we might wish to warm-up an unarchived timeline after a shard migration. However, we don't want them to be downloaded on the secondary until we've warmed up the primary. ## Summary of Changes Include these layers in the heatmap and mark them as cold. All heatmap operations act on non-cold layers apart from the attached location warming up API, which will download the cold layers. Once the cold layers are downloaded on the primary, they'll be included in the next heatmap as hot and the secondary starts fetching them too.
This commit is contained in:
@@ -869,8 +869,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
|
||||
|
||||
let layers_in_heatmap = heatmap_timeline
|
||||
.layers
|
||||
.iter()
|
||||
.hot_layers()
|
||||
.map(|l| (&l.name, l.metadata.generation))
|
||||
.collect::<HashSet<_>>();
|
||||
let layers_on_disk = timeline_state
|
||||
@@ -1015,7 +1014,8 @@ impl<'a> TenantDownloader<'a> {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
|
||||
for layer in timeline.layers {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
for layer in timeline.into_hot_layers() {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return (Err(UpdateError::Cancelled), touched);
|
||||
@@ -1040,7 +1040,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
|
||||
.download_layer(tenant_shard_id, &timeline_id, layer, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
@@ -1148,7 +1148,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
@@ -1316,11 +1316,11 @@ async fn init_timeline_state(
|
||||
// As we iterate through layers found on disk, we will look up their metadata from this map.
|
||||
// Layers not present in metadata will be discarded.
|
||||
let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
|
||||
heatmap.layers.iter().map(|l| (&l.name, l)).collect();
|
||||
heatmap.hot_layers().map(|l| (&l.name, l)).collect();
|
||||
|
||||
let last_heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
|
||||
if let Some(last_heatmap) = last_heatmap {
|
||||
last_heatmap.layers.iter().map(|l| (&l.name, l)).collect()
|
||||
last_heatmap.hot_layers().map(|l| (&l.name, l)).collect()
|
||||
} else {
|
||||
HashMap::new()
|
||||
};
|
||||
|
||||
@@ -42,7 +42,7 @@ pub(crate) struct HeatMapTimeline {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
|
||||
pub(crate) layers: Vec<HeatMapLayer>,
|
||||
layers: Vec<HeatMapLayer>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -53,8 +53,10 @@ pub(crate) struct HeatMapLayer {
|
||||
|
||||
#[serde_as(as = "TimestampSeconds<i64>")]
|
||||
pub(crate) access_time: SystemTime,
|
||||
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
|
||||
#[serde(default)]
|
||||
pub(crate) cold: bool, // TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
}
|
||||
|
||||
impl HeatMapLayer {
|
||||
@@ -62,11 +64,13 @@ impl HeatMapLayer {
|
||||
name: LayerName,
|
||||
metadata: LayerFileMetadata,
|
||||
access_time: SystemTime,
|
||||
cold: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
metadata,
|
||||
access_time,
|
||||
cold,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -78,6 +82,18 @@ impl HeatMapTimeline {
|
||||
layers,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_hot_layers(self) -> impl Iterator<Item = HeatMapLayer> {
|
||||
self.layers.into_iter().filter(|l| !l.cold)
|
||||
}
|
||||
|
||||
pub(crate) fn hot_layers(&self) -> impl Iterator<Item = &HeatMapLayer> {
|
||||
self.layers.iter().filter(|l| !l.cold)
|
||||
}
|
||||
|
||||
pub(crate) fn all_layers(&self) -> impl Iterator<Item = &HeatMapLayer> {
|
||||
self.layers.iter()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct HeatMapStats {
|
||||
@@ -92,7 +108,7 @@ impl HeatMapTenant {
|
||||
layers: 0,
|
||||
};
|
||||
for timeline in &self.timelines {
|
||||
for layer in &timeline.layers {
|
||||
for layer in timeline.hot_layers() {
|
||||
stats.layers += 1;
|
||||
stats.bytes += layer.metadata.file_size;
|
||||
}
|
||||
|
||||
@@ -1563,10 +1563,10 @@ impl LayerInner {
|
||||
|
||||
self.access_stats.record_residence_event();
|
||||
|
||||
self.status.as_ref().unwrap().send_replace(Status::Evicted);
|
||||
|
||||
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
|
||||
|
||||
self.status.as_ref().unwrap().send_replace(Status::Evicted);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -3648,7 +3648,7 @@ impl Timeline {
|
||||
let visible_non_resident = match previous_heatmap.as_deref() {
|
||||
Some(PreviousHeatmap::Active {
|
||||
heatmap, read_at, ..
|
||||
}) => Some(heatmap.layers.iter().filter_map(|hl| {
|
||||
}) => Some(heatmap.all_layers().filter_map(|hl| {
|
||||
let desc: PersistentLayerDesc = hl.name.clone().into();
|
||||
let layer = guard.try_get_from_key(&desc.key())?;
|
||||
|
||||
@@ -3664,7 +3664,7 @@ impl Timeline {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((desc, hl.metadata.clone(), hl.access_time))
|
||||
Some((desc, hl.metadata.clone(), hl.access_time, hl.cold))
|
||||
})),
|
||||
Some(PreviousHeatmap::Obsolete) => None,
|
||||
None => None,
|
||||
@@ -3680,6 +3680,7 @@ impl Timeline {
|
||||
layer.layer_desc().clone(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
false, // these layers are not cold
|
||||
))
|
||||
}
|
||||
LayerVisibilityHint::Covered => {
|
||||
@@ -3706,12 +3707,14 @@ impl Timeline {
|
||||
// Sort layers in order of which to download first. For a large set of layers to download, we
|
||||
// want to prioritize those layers which are most likely to still be in the resident many minutes
|
||||
// or hours later:
|
||||
// - Cold layers go last for convenience when a human inspects the heatmap.
|
||||
// - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might
|
||||
// only exist for a few minutes before being compacted into L1s.
|
||||
// - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner
|
||||
// the layer is likely to be covered by an image layer during compaction.
|
||||
layers.sort_by_key(|(desc, _meta, _atime)| {
|
||||
layers.sort_by_key(|(desc, _meta, _atime, cold)| {
|
||||
std::cmp::Reverse((
|
||||
*cold,
|
||||
!LayerMap::is_l0(&desc.key_range, desc.is_delta),
|
||||
desc.lsn_range.end,
|
||||
))
|
||||
@@ -3719,7 +3722,9 @@ impl Timeline {
|
||||
|
||||
let layers = layers
|
||||
.into_iter()
|
||||
.map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime))
|
||||
.map(|(desc, meta, atime, cold)| {
|
||||
HeatMapLayer::new(desc.layer_name(), meta, atime, cold)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Some(HeatMapTimeline::new(self.timeline_id, layers))
|
||||
@@ -3739,6 +3744,7 @@ impl Timeline {
|
||||
name: vl.layer_desc().layer_name(),
|
||||
metadata: vl.metadata(),
|
||||
access_time: now,
|
||||
cold: true,
|
||||
};
|
||||
heatmap_layers.push(hl);
|
||||
}
|
||||
@@ -7040,6 +7046,7 @@ mod tests {
|
||||
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::value::Value;
|
||||
use std::iter::Iterator;
|
||||
use tracing::Instrument;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -7053,8 +7060,8 @@ mod tests {
|
||||
use crate::tenant::{PreviousHeatmap, Timeline};
|
||||
|
||||
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
|
||||
assert_eq!(lhs.layers.len(), rhs.layers.len());
|
||||
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
|
||||
assert_eq!(lhs.all_layers().count(), rhs.all_layers().count());
|
||||
let lhs_rhs = lhs.all_layers().zip(rhs.all_layers());
|
||||
for (l, r) in lhs_rhs {
|
||||
assert_eq!(l.name, r.name);
|
||||
assert_eq!(l.metadata, r.metadata);
|
||||
@@ -7132,10 +7139,11 @@ mod tests {
|
||||
assert_eq!(heatmap.timeline_id, timeline.timeline_id);
|
||||
|
||||
// L0 should come last
|
||||
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
|
||||
let heatmap_layers = heatmap.all_layers().collect::<Vec<_>>();
|
||||
assert_eq!(heatmap_layers.last().unwrap().name, l0_delta.layer_name());
|
||||
|
||||
let mut last_lsn = Lsn::MAX;
|
||||
for layer in &heatmap.layers {
|
||||
for layer in heatmap_layers {
|
||||
// Covered layer should be omitted
|
||||
assert!(layer.name != covered_delta.layer_name());
|
||||
|
||||
@@ -7264,7 +7272,7 @@ mod tests {
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
// Both layers should be in the heatmap
|
||||
assert!(!heatmap.layers.is_empty());
|
||||
assert!(heatmap.all_layers().count() > 0);
|
||||
|
||||
// Now simulate a migration.
|
||||
timeline
|
||||
@@ -7290,7 +7298,7 @@ mod tests {
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
assert!(post_eviction_heatmap.layers.is_empty());
|
||||
assert_eq!(post_eviction_heatmap.all_layers().count(), 0);
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
|
||||
@@ -61,11 +61,11 @@ impl HeatmapLayersDownloader {
|
||||
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
heatmap_layers=%heatmap.layers.len(),
|
||||
heatmap_layers=%heatmap.all_layers().count(),
|
||||
"Starting heatmap layers download"
|
||||
);
|
||||
|
||||
let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
|
||||
let stream = futures::stream::iter(heatmap.all_layers().cloned().filter_map(
|
||||
|layer| {
|
||||
let ctx = ctx.attached_child();
|
||||
let tl = timeline.clone();
|
||||
|
||||
@@ -955,6 +955,17 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
raise RuntimeError(f"No heatmap for timeline: {tlid}")
|
||||
|
||||
def count_timeline_heatmap_layers(tlid) -> tuple[int, int]:
|
||||
cold, hot = 0, 0
|
||||
layers = timeline_heatmap(tlid)["layers"]
|
||||
for layer in layers:
|
||||
if layer["cold"]:
|
||||
cold += 1
|
||||
else:
|
||||
hot += 1
|
||||
|
||||
return cold, hot
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Timed out.*downloading layers.*",
|
||||
@@ -988,13 +999,19 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
|
||||
)
|
||||
|
||||
def all_layers_downloaded(expected_layer_count: int):
|
||||
local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id))
|
||||
def all_layers_downloaded(node, expected_layer_count: int):
|
||||
local_layers_count = len(node.list_layers(tenant_id, timeline_id))
|
||||
|
||||
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
|
||||
assert local_layers_count >= expected_layer_count
|
||||
|
||||
wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count))
|
||||
def no_layers_downloaded(node):
|
||||
local_layers_count = len(node.list_layers(tenant_id, timeline_id))
|
||||
|
||||
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
|
||||
assert local_layers_count == 0
|
||||
|
||||
wait_until(lambda: all_layers_downloaded(ps_secondary, after_migration_heatmap_layers_count))
|
||||
|
||||
# Read everything and make sure that we're not downloading anything extra.
|
||||
# All hot layers should be available locally now.
|
||||
@@ -1047,13 +1064,35 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(lambda: check_archival_state(TimelineArchivalState.UNARCHIVED, child_timeline_id))
|
||||
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
log.info(f"Parent timeline heatmap size: {len(timeline_heatmap(timeline_id)['layers'])}")
|
||||
log.info(f"Child timeline heatmap size: {len(timeline_heatmap(child_timeline_id)['layers'])}")
|
||||
|
||||
expected_locally = len(timeline_heatmap(timeline_id)["layers"])
|
||||
assert expected_locally > 0
|
||||
parent_cold, parent_hot = count_timeline_heatmap_layers(timeline_id)
|
||||
child_cold, child_hot = count_timeline_heatmap_layers(child_timeline_id)
|
||||
|
||||
log.info(f"Parent timeline heatmap size: cold={parent_cold}, hot={parent_hot}")
|
||||
log.info(f"Child timeline heatmap size: cold={child_cold}, hot={child_hot}")
|
||||
|
||||
# All layers in the heatmap should come from the generation on unarchival.
|
||||
# Hence, they should be cold.
|
||||
assert parent_cold > 0
|
||||
assert parent_hot == 0
|
||||
|
||||
expected_locally = parent_cold
|
||||
|
||||
env.storage_controller.download_heatmap_layers(
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), child_timeline_id, recurse=True
|
||||
)
|
||||
wait_until(lambda: all_layers_downloaded(expected_locally))
|
||||
wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally))
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "off")])
|
||||
|
||||
# The uploaded heatmap is still empty. Clean up all layers on the secondary.
|
||||
ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100)
|
||||
wait_until(lambda: no_layers_downloaded(ps_attached))
|
||||
|
||||
# Upload a new heatmap. The previously cold layers become hot since they're now resident.
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
# Warm up the current secondary.
|
||||
ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100)
|
||||
wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally))
|
||||
|
||||
Reference in New Issue
Block a user