mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
pagesever: include visible layers in heatmaps after unarchival (#10880)
## Problem https://github.com/neondatabase/neon/pull/10788 introduced an API for warming up attached locations by downloading all layers in the heatmap. We intend to use it for warming up timelines after unarchival too, but it doesn't work. Any heatmap generated after the unarchival will not include our timeline, so we've lost all those layers. ## Summary of changes Generate a cheeky heatmap on unarchival. It includes all the visible layers. Use that as the `PreviousHeatmap` which inputs into actual heatmap generation. Closes: https://github.com/neondatabase/neon/issues/10541
This commit is contained in:
@@ -1189,6 +1189,39 @@ impl Tenant {
|
||||
format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
// When unarchiving, we've mostly likely lost the heatmap generated prior
|
||||
// to the archival operation. To allow warming this timeline up, generate
|
||||
// a previous heatmap which contains all visible layers in the layer map.
|
||||
// This previous heatmap will be used whenever a fresh heatmap is generated
|
||||
// for the timeline.
|
||||
if matches!(cause, LoadTimelineCause::Unoffload) {
|
||||
let mut tline_ending_at = Some((&timeline, timeline.get_last_record_lsn()));
|
||||
while let Some((tline, end_lsn)) = tline_ending_at {
|
||||
let unarchival_heatmap = tline.generate_unarchival_heatmap(end_lsn).await;
|
||||
if !tline.is_previous_heatmap_active() {
|
||||
tline
|
||||
.previous_heatmap
|
||||
.store(Some(Arc::new(unarchival_heatmap)));
|
||||
} else {
|
||||
tracing::info!("Previous heatmap still active. Dropping unarchival heatmap.")
|
||||
}
|
||||
|
||||
match tline.ancestor_timeline() {
|
||||
Some(ancestor) => {
|
||||
if ancestor.update_layer_visibility().await.is_err() {
|
||||
// Ancestor timeline is shutting down.
|
||||
break;
|
||||
}
|
||||
|
||||
tline_ending_at = Some((ancestor, tline.get_ancestor_lsn()));
|
||||
}
|
||||
None => {
|
||||
tline_ending_at = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match import_pgdata {
|
||||
Some(import_pgdata) if !import_pgdata.is_done() => {
|
||||
match cause {
|
||||
|
||||
@@ -468,7 +468,7 @@ pub struct Timeline {
|
||||
/// If Some, collects GetPage metadata for an ongoing PageTrace.
|
||||
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
|
||||
|
||||
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
|
||||
pub(super) previous_heatmap: ArcSwapOption<PreviousHeatmap>,
|
||||
|
||||
/// May host a background Tokio task which downloads all the layers from the current
|
||||
/// heatmap on demand.
|
||||
@@ -3524,6 +3524,14 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
pub(super) fn is_previous_heatmap_active(&self) -> bool {
|
||||
self.previous_heatmap
|
||||
.load()
|
||||
.as_ref()
|
||||
.map(|prev| matches!(**prev, PreviousHeatmap::Active { .. }))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// The timeline heatmap is a hint to secondary locations from the primary location,
|
||||
/// indicating which layers are currently on-disk on the primary.
|
||||
///
|
||||
@@ -3596,6 +3604,7 @@ impl Timeline {
|
||||
Some(non_resident) => {
|
||||
let mut non_resident = non_resident.peekable();
|
||||
if non_resident.peek().is_none() {
|
||||
tracing::info!(timeline_id=%self.timeline_id, "Previous heatmap now obsolete");
|
||||
self.previous_heatmap
|
||||
.store(Some(PreviousHeatmap::Obsolete.into()));
|
||||
}
|
||||
@@ -3627,6 +3636,36 @@ impl Timeline {
|
||||
Some(HeatMapTimeline::new(self.timeline_id, layers))
|
||||
}
|
||||
|
||||
pub(super) async fn generate_unarchival_heatmap(&self, end_lsn: Lsn) -> PreviousHeatmap {
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
let now = SystemTime::now();
|
||||
let mut heatmap_layers = Vec::default();
|
||||
for vl in guard.visible_layers() {
|
||||
if vl.layer_desc().get_lsn_range().start >= end_lsn {
|
||||
continue;
|
||||
}
|
||||
|
||||
let hl = HeatMapLayer {
|
||||
name: vl.layer_desc().layer_name(),
|
||||
metadata: vl.metadata(),
|
||||
access_time: now,
|
||||
};
|
||||
heatmap_layers.push(hl);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Generating unarchival heatmap with {} layers",
|
||||
heatmap_layers.len()
|
||||
);
|
||||
|
||||
let heatmap = HeatMapTimeline::new(self.timeline_id, heatmap_layers);
|
||||
PreviousHeatmap::Active {
|
||||
heatmap,
|
||||
read_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the given lsn is or was an ancestor branchpoint.
|
||||
pub(crate) fn is_ancestor_lsn(&self, lsn: Lsn) -> bool {
|
||||
// upon timeline detach, we set the ancestor_lsn to Lsn::INVALID and the store the original
|
||||
|
||||
@@ -1020,7 +1020,7 @@ impl Timeline {
|
||||
///
|
||||
/// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
|
||||
/// that we know won't be needed for reads.
|
||||
pub(super) async fn update_layer_visibility(
|
||||
pub(crate) async fn update_layer_visibility(
|
||||
&self,
|
||||
) -> Result<(), super::layer_manager::Shutdown> {
|
||||
let head_lsn = self.get_last_record_lsn();
|
||||
|
||||
@@ -15,8 +15,8 @@ use crate::{
|
||||
tenant::{
|
||||
layer_map::{BatchedUpdates, LayerMap},
|
||||
storage_layer::{
|
||||
AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
|
||||
ResidentLayer,
|
||||
AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
|
||||
PersistentLayerKey, ResidentLayer,
|
||||
},
|
||||
},
|
||||
};
|
||||
@@ -118,6 +118,12 @@ impl LayerManager {
|
||||
self.layers().values().filter(|l| l.is_likely_resident())
|
||||
}
|
||||
|
||||
pub(crate) fn visible_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
|
||||
self.layers()
|
||||
.values()
|
||||
.filter(|l| l.visibility() == LayerVisibilityHint::Visible)
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, layer: &Layer) -> bool {
|
||||
self.contains_key(&layer.layer_desc().key())
|
||||
}
|
||||
|
||||
@@ -8,9 +8,10 @@ from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_BRANCH_NAME,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
StorageControllerMigrationConfig,
|
||||
@@ -927,8 +928,12 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
|
||||
child_timeline_id = env.create_branch(
|
||||
"foo", tenant_id, ancestor_branch_name=DEFAULT_BRANCH_NAME
|
||||
)
|
||||
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.stop()
|
||||
|
||||
# Expect lots of layers
|
||||
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
|
||||
@@ -937,9 +942,19 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
|
||||
|
||||
def timeline_heatmap(tlid):
|
||||
assert env.pageserver_remote_storage is not None
|
||||
|
||||
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
for htl in heatmap["timelines"]:
|
||||
if htl["timeline_id"] == str(tlid):
|
||||
return htl
|
||||
|
||||
raise RuntimeError(f"No heatmap for timeline: {tlid}")
|
||||
|
||||
# Upload a heatmap, so that secondaries have something to download
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
heatmap_before_migration = timeline_heatmap(timeline_id)
|
||||
|
||||
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
|
||||
# However, it pulls the heatmap, which will be important later.
|
||||
@@ -971,17 +986,12 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id
|
||||
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
heatmap_after_migration = timeline_heatmap(timeline_id)
|
||||
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0
|
||||
assert len(heatmap_before_migration["layers"]) > 0
|
||||
|
||||
# The new layer map should contain all the layers in the pre-migration one
|
||||
# and a new in memory layer
|
||||
after_migration_heatmap_layers_count = len(heatmap_after_migration["timelines"][0]["layers"])
|
||||
assert (
|
||||
len(heatmap_before_migration["timelines"][0]["layers"]) + 1
|
||||
== after_migration_heatmap_layers_count
|
||||
)
|
||||
after_migration_heatmap_layers_count = len(heatmap_after_migration["layers"])
|
||||
assert len(heatmap_before_migration["layers"]) <= after_migration_heatmap_layers_count
|
||||
|
||||
log.info(f"Heatmap size after cold migration is {after_migration_heatmap_layers_count}")
|
||||
|
||||
@@ -989,10 +999,71 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
|
||||
)
|
||||
|
||||
def all_layers_downloaded():
|
||||
# Now simulate the case where a child timeline is archived, parent layers
|
||||
# are evicted and the child is unarchived. When the child is unarchived,
|
||||
# itself and the parent update their heatmaps to contain layers needed by the
|
||||
# child. One can warm up the timeline hierarchy since the heatmaps are ready.
|
||||
|
||||
def all_layers_downloaded(expected_layer_count: int):
|
||||
local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id))
|
||||
|
||||
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
|
||||
assert local_layers_count == after_migration_heatmap_layers_count
|
||||
assert local_layers_count >= expected_layer_count
|
||||
|
||||
wait_until(all_layers_downloaded)
|
||||
wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count))
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
before = (
|
||||
ps_secondary.http_client()
|
||||
.get_metrics()
|
||||
.query_one("pageserver_remote_ondemand_downloaded_layers_total")
|
||||
.value
|
||||
)
|
||||
workload.validate()
|
||||
after = (
|
||||
ps_secondary.http_client()
|
||||
.get_metrics()
|
||||
.query_one("pageserver_remote_ondemand_downloaded_layers_total")
|
||||
.value
|
||||
)
|
||||
|
||||
workload.stop()
|
||||
assert before == after
|
||||
|
||||
def check_archival_state(state: TimelineArchivalState, tline):
|
||||
timelines = (
|
||||
timeline["timeline_id"]
|
||||
for timeline in ps_secondary.http_client().timeline_list(tenant_id=tenant_id)
|
||||
)
|
||||
|
||||
if state == TimelineArchivalState.ARCHIVED:
|
||||
assert str(tline) not in timelines
|
||||
elif state == TimelineArchivalState.UNARCHIVED:
|
||||
assert str(tline) in timelines
|
||||
|
||||
ps_secondary.http_client().timeline_archival_config(
|
||||
tenant_id, child_timeline_id, TimelineArchivalState.ARCHIVED
|
||||
)
|
||||
ps_secondary.http_client().timeline_offload(tenant_id, child_timeline_id)
|
||||
wait_until(lambda: check_archival_state(TimelineArchivalState.ARCHIVED, child_timeline_id))
|
||||
|
||||
ps_secondary.http_client().evict_all_layers(tenant_id, timeline_id)
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
assert len(timeline_heatmap(timeline_id)["layers"]) == 0
|
||||
|
||||
ps_secondary.http_client().timeline_archival_config(
|
||||
tenant_id, child_timeline_id, TimelineArchivalState.UNARCHIVED
|
||||
)
|
||||
wait_until(lambda: check_archival_state(TimelineArchivalState.UNARCHIVED, child_timeline_id))
|
||||
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
log.info(f"Parent timeline heatmap size: {len(timeline_heatmap(timeline_id)['layers'])}")
|
||||
log.info(f"Child timeline heatmap size: {len(timeline_heatmap(child_timeline_id)['layers'])}")
|
||||
|
||||
expected_locally = len(timeline_heatmap(timeline_id)["layers"])
|
||||
assert expected_locally > 0
|
||||
|
||||
env.storage_controller.download_heatmap_layers(
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
|
||||
)
|
||||
wait_until(lambda: all_layers_downloaded(expected_locally))
|
||||
|
||||
Reference in New Issue
Block a user