pageserver: make heatmap layer download API more cplane friendly (#10957)

## Problem

We intend for cplane to use the heatmap layer download API to warm up
timelines after unarchival. It's tricky for them to recurse in the
ancestors,
and the current implementation doesn't work well when unarchiving a
chain
of branches and warming them up.

## Summary of changes

* Add a `recurse` flag to the API. When the flag is set, the operation
recurses into the parent
timeline after the current one is done.
* Be resilient to warming up a chain of unarchived branches. Let's say
we unarchived `B` and `C` from
the `A -> B -> C` branch hierarchy. `B` got unarchived first. We
generated the unarchival heatmaps
and stash them in `A` and `B`. When `C` unarchived, it dropped it's
unarchival heatmap since `A` and `B`
already had one. If `C` needed layers from `A` and `B`, it was out of
luck. Now, when choosing whether
to keep an unarchival heatmap we look at its end LSN. If it's more
inclusive than what we currently have,
keep it.
This commit is contained in:
Vlad Lazar
2025-02-28 10:36:53 +00:00
committed by GitHub
parent 55633ebe3a
commit 0d6d58bd3e
11 changed files with 111 additions and 60 deletions

View File

@@ -480,6 +480,7 @@ impl Client {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
concurrency: Option<usize>,
recurse: bool,
) -> Result<()> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
@@ -487,6 +488,9 @@ impl Client {
))
.expect("Cannot build URL");
path.query_pairs_mut()
.append_pair("recurse", &format!("{}", recurse));
if let Some(concurrency) = concurrency {
path.query_pairs_mut()
.append_pair("concurrency", &format!("{}", concurrency));

View File

@@ -1435,6 +1435,7 @@ async fn timeline_download_heatmap_layers_handler(
let desired_concurrency =
parse_query_param(&request, "concurrency")?.unwrap_or(DEFAULT_CONCURRENCY);
let recurse = parse_query_param(&request, "recurse")?.unwrap_or(false);
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
@@ -1451,9 +1452,7 @@ async fn timeline_download_heatmap_layers_handler(
.unwrap_or(DEFAULT_MAX_CONCURRENCY);
let concurrency = std::cmp::min(max_concurrency, desired_concurrency);
timeline
.start_heatmap_layers_download(concurrency, &ctx)
.await?;
timeline.start_heatmap_layers_download(concurrency, recurse, &ctx)?;
json_response(StatusCode::ACCEPTED, ())
}

View File

@@ -1052,6 +1052,8 @@ impl Timeline {
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;

View File

@@ -1153,12 +1153,15 @@ impl Tenant {
let mut tline_ending_at = Some((&timeline, timeline.get_last_record_lsn()));
while let Some((tline, end_lsn)) = tline_ending_at {
let unarchival_heatmap = tline.generate_unarchival_heatmap(end_lsn).await;
if !tline.is_previous_heatmap_active() {
// Another unearchived timeline might have generated a heatmap for this ancestor.
// If the current branch point greater than the previous one use the the heatmap
// we just generated - it should include more layers.
if !tline.should_keep_previous_heatmap(end_lsn) {
tline
.previous_heatmap
.store(Some(Arc::new(unarchival_heatmap)));
} else {
tracing::info!("Previous heatmap still active. Dropping unarchival heatmap.")
tracing::info!("Previous heatmap preferred. Dropping unarchival heatmap.")
}
match tline.ancestor_timeline() {
@@ -1939,6 +1942,7 @@ impl Tenant {
hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active {
heatmap: h,
read_at: hs.1,
end_lsn: None,
})
});
part_downloads.spawn(

View File

@@ -442,6 +442,8 @@ pub(crate) enum PreviousHeatmap {
Active {
heatmap: HeatMapTimeline,
read_at: std::time::Instant,
// End LSN covered by the heatmap if known
end_lsn: Option<Lsn>,
},
Obsolete,
}
@@ -3570,12 +3572,16 @@ impl Timeline {
Ok(layer)
}
pub(super) fn is_previous_heatmap_active(&self) -> bool {
self.previous_heatmap
.load()
.as_ref()
.map(|prev| matches!(**prev, PreviousHeatmap::Active { .. }))
.unwrap_or(false)
pub(super) fn should_keep_previous_heatmap(&self, new_heatmap_end_lsn: Lsn) -> bool {
let crnt = self.previous_heatmap.load();
match crnt.as_deref() {
Some(PreviousHeatmap::Active { end_lsn, .. }) => match end_lsn {
Some(crnt_end_lsn) => *crnt_end_lsn > new_heatmap_end_lsn,
None => true,
},
Some(PreviousHeatmap::Obsolete) => false,
None => false,
}
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
@@ -3603,26 +3609,26 @@ impl Timeline {
// heatamp.
let previous_heatmap = self.previous_heatmap.load();
let visible_non_resident = match previous_heatmap.as_deref() {
Some(PreviousHeatmap::Active { heatmap, read_at }) => {
Some(heatmap.layers.iter().filter_map(|hl| {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
Some(PreviousHeatmap::Active {
heatmap, read_at, ..
}) => Some(heatmap.layers.iter().filter_map(|hl| {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
if layer.visibility() == LayerVisibilityHint::Covered {
return None;
}
if layer.visibility() == LayerVisibilityHint::Covered {
return None;
}
if layer.is_likely_resident() {
return None;
}
if layer.is_likely_resident() {
return None;
}
if layer.last_evicted_at().happened_after(*read_at) {
return None;
}
if layer.last_evicted_at().happened_after(*read_at) {
return None;
}
Some((desc, hl.metadata.clone(), hl.access_time))
}))
}
Some((desc, hl.metadata.clone(), hl.access_time))
})),
Some(PreviousHeatmap::Obsolete) => None,
None => None,
};
@@ -3709,6 +3715,7 @@ impl Timeline {
PreviousHeatmap::Active {
heatmap,
read_at: Instant::now(),
end_lsn: Some(end_lsn),
}
}
@@ -7046,6 +7053,7 @@ mod tests {
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
end_lsn: None,
})));
// Generate a new heatmap and assert that it contains the same layers as the old one.
@@ -7148,6 +7156,7 @@ mod tests {
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
end_lsn: None,
})));
// Evict all the layers in the previous heatmap

View File

@@ -32,6 +32,7 @@ impl HeatmapLayersDownloader {
fn new(
timeline: Arc<Timeline>,
concurrency: usize,
recurse: bool,
ctx: RequestContext,
) -> Result<HeatmapLayersDownloader, ApiError> {
let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
@@ -98,6 +99,20 @@ impl HeatmapLayersDownloader {
},
_ = cancel.cancelled() => {
tracing::info!("Heatmap layers download cancelled");
return;
}
}
if recurse {
if let Some(ancestor) = timeline.ancestor_timeline() {
let ctx = ctx.attached_child();
let res =
ancestor.start_heatmap_layers_download(concurrency, recurse, &ctx);
if let Err(err) = res {
tracing::info!(
"Failed to start heatmap layers download for ancestor: {err}"
);
}
}
}
}
@@ -140,14 +155,20 @@ impl HeatmapLayersDownloader {
}
impl Timeline {
pub(crate) async fn start_heatmap_layers_download(
pub(crate) fn start_heatmap_layers_download(
self: &Arc<Self>,
concurrency: usize,
recurse: bool,
ctx: &RequestContext,
) -> Result<(), ApiError> {
let mut locked = self.heatmap_layers_downloader.lock().unwrap();
if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
let dl = HeatmapLayersDownloader::new(self.clone(), concurrency, ctx.attached_child())?;
let dl = HeatmapLayersDownloader::new(
self.clone(),
concurrency,
recurse,
ctx.attached_child(),
)?;
*locked = Some(dl);
Ok(())
} else {

View File

@@ -524,9 +524,10 @@ async fn handle_tenant_timeline_download_heatmap_layers(
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false);
service
.tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
.tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse)
.await?;
json_response(StatusCode::OK, ())

View File

@@ -281,13 +281,19 @@ impl PageserverClient {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
concurrency: Option<usize>,
recurse: bool,
) -> Result<()> {
measured_request!(
"download_heatmap_layers",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner
.timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
.timeline_download_heatmap_layers(
tenant_shard_id,
timeline_id,
concurrency,
recurse
)
.await
)
}

View File

@@ -3774,6 +3774,7 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
concurrency: Option<usize>,
recurse: bool,
) -> Result<(), ApiError> {
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
@@ -3811,7 +3812,12 @@ impl Service {
targets,
|tenant_shard_id, client| async move {
client
.timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
.timeline_download_heatmap_layers(
tenant_shard_id,
timeline_id,
concurrency,
recurse,
)
.await
},
1,

View File

@@ -2469,12 +2469,21 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
return [TenantShardId.parse(tid) for tid in response.json()["updated"]]
def download_heatmap_layers(self, tenant_shard_id: TenantShardId, timeline_id: TimelineId):
def download_heatmap_layers(
self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, recurse: bool | None = None
):
url = (
f"{self.api}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers"
)
if recurse is not None:
url = url + f"?recurse={str(recurse).lower()}"
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
url,
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
def __enter__(self) -> Self:

View File

@@ -938,9 +938,12 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
# Expect lots of layers
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
# Simulate large data by making layer downloads artifically slow
for ps in env.pageservers:
# Simulate large data by making layer downloads artifically slow
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
# Make the initial logical size calculation lie. Otherwise it on demand downloads
# layers and makes accounting difficult.
ps.http_client().configure_failpoints(("skip-logical-size-calculation", "return"))
def timeline_heatmap(tlid):
assert env.pageserver_remote_storage is not None
@@ -952,21 +955,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
raise RuntimeError(f"No heatmap for timeline: {tlid}")
# Upload a heatmap, so that secondaries have something to download
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
heatmap_before_migration = timeline_heatmap(timeline_id)
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
# However, it pulls the heatmap, which will be important later.
http_client = env.storage_controller.pageserver_api()
(status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
assert status == 202
assert progress["heatmap_mtime"] is not None
assert progress["layers_downloaded"] > 0
assert progress["bytes_downloaded"] > 0
assert progress["layers_total"] > progress["layers_downloaded"]
assert progress["bytes_total"] > progress["bytes_downloaded"]
env.storage_controller.allowed_errors.extend(
[
".*Timed out.*downloading layers.*",
@@ -975,6 +963,7 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
# Use a custom configuration that gives up earlier than usual.
# We can't hydrate everything anyway because of the failpoints.
# Implicitly, this also uploads a heatmap from the current attached location.
config = StorageControllerMigrationConfig(
secondary_warmup_timeout="5s", secondary_download_request_timeout="2s"
)
@@ -988,22 +977,17 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
heatmap_after_migration = timeline_heatmap(timeline_id)
assert len(heatmap_before_migration["layers"]) > 0
local_layers = ps_secondary.list_layers(tenant_id, timeline_id)
# We download 1 layer per second and give up within 5 seconds.
assert len(local_layers) < 10
after_migration_heatmap_layers_count = len(heatmap_after_migration["layers"])
assert len(heatmap_before_migration["layers"]) <= after_migration_heatmap_layers_count
log.info(f"Heatmap size after cold migration is {after_migration_heatmap_layers_count}")
env.storage_controller.download_heatmap_layers(
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
)
# Now simulate the case where a child timeline is archived, parent layers
# are evicted and the child is unarchived. When the child is unarchived,
# itself and the parent update their heatmaps to contain layers needed by the
# child. One can warm up the timeline hierarchy since the heatmaps are ready.
def all_layers_downloaded(expected_layer_count: int):
local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id))
@@ -1011,8 +995,9 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
assert local_layers_count >= expected_layer_count
wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count))
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
# Read everything and make sure that we're not downloading anything extra.
# All hot layers should be available locally now.
before = (
ps_secondary.http_client()
.get_metrics()
@@ -1030,6 +1015,11 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
workload.stop()
assert before == after
# Now simulate the case where a child timeline is archived, parent layers
# are evicted and the child is unarchived. When the child is unarchived,
# itself and the parent update their heatmaps to contain layers needed by the
# child. One can warm up the timeline hierarchy since the heatmaps are ready.
def check_archival_state(state: TimelineArchivalState, tline):
timelines = (
timeline["timeline_id"]
@@ -1064,6 +1054,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
assert expected_locally > 0
env.storage_controller.download_heatmap_layers(
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
TenantShardId(tenant_id, shard_number=0, shard_count=0), child_timeline_id, recurse=True
)
wait_until(lambda: all_layers_downloaded(expected_locally))