Compare commits

..

3 Commits

Author SHA1 Message Date
Jan Christian Grünhage
4e3bdc5984 nomerge: simulate run-kind=compute-rc-pr 2025-02-28 11:25:20 +01:00
Alexey Masterov
04a33e8af0 Change TEST_EXTENSIONS_TAG variable 2025-02-28 11:13:15 +01:00
Alexey Masterov
71f0235600 TEst extensions upgrade should work correctly if some neon images of compute tag are not accessible 2025-02-28 11:09:06 +01:00
12 changed files with 61 additions and 112 deletions

View File

@@ -46,7 +46,7 @@ jobs:
env:
RUN_KIND: >-
${{
false
'compute-rc-pr'
|| (inputs.github-event-name == 'push' && github.ref_name == 'main') && 'push-main'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release') && 'storage-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-compute') && 'compute-release'

View File

@@ -480,7 +480,6 @@ impl Client {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
concurrency: Option<usize>,
recurse: bool,
) -> Result<()> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
@@ -488,9 +487,6 @@ impl Client {
))
.expect("Cannot build URL");
path.query_pairs_mut()
.append_pair("recurse", &format!("{}", recurse));
if let Some(concurrency) = concurrency {
path.query_pairs_mut()
.append_pair("concurrency", &format!("{}", concurrency));

View File

@@ -1435,7 +1435,6 @@ async fn timeline_download_heatmap_layers_handler(
let desired_concurrency =
parse_query_param(&request, "concurrency")?.unwrap_or(DEFAULT_CONCURRENCY);
let recurse = parse_query_param(&request, "recurse")?.unwrap_or(false);
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
@@ -1452,7 +1451,9 @@ async fn timeline_download_heatmap_layers_handler(
.unwrap_or(DEFAULT_MAX_CONCURRENCY);
let concurrency = std::cmp::min(max_concurrency, desired_concurrency);
timeline.start_heatmap_layers_download(concurrency, recurse, &ctx)?;
timeline
.start_heatmap_layers_download(concurrency, &ctx)
.await?;
json_response(StatusCode::ACCEPTED, ())
}

View File

@@ -1052,8 +1052,6 @@ impl Timeline {
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;

View File

@@ -1153,15 +1153,12 @@ impl Tenant {
let mut tline_ending_at = Some((&timeline, timeline.get_last_record_lsn()));
while let Some((tline, end_lsn)) = tline_ending_at {
let unarchival_heatmap = tline.generate_unarchival_heatmap(end_lsn).await;
// Another unearchived timeline might have generated a heatmap for this ancestor.
// If the current branch point greater than the previous one use the the heatmap
// we just generated - it should include more layers.
if !tline.should_keep_previous_heatmap(end_lsn) {
if !tline.is_previous_heatmap_active() {
tline
.previous_heatmap
.store(Some(Arc::new(unarchival_heatmap)));
} else {
tracing::info!("Previous heatmap preferred. Dropping unarchival heatmap.")
tracing::info!("Previous heatmap still active. Dropping unarchival heatmap.")
}
match tline.ancestor_timeline() {
@@ -1942,7 +1939,6 @@ impl Tenant {
hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active {
heatmap: h,
read_at: hs.1,
end_lsn: None,
})
});
part_downloads.spawn(

View File

@@ -442,8 +442,6 @@ pub(crate) enum PreviousHeatmap {
Active {
heatmap: HeatMapTimeline,
read_at: std::time::Instant,
// End LSN covered by the heatmap if known
end_lsn: Option<Lsn>,
},
Obsolete,
}
@@ -3572,16 +3570,12 @@ impl Timeline {
Ok(layer)
}
pub(super) fn should_keep_previous_heatmap(&self, new_heatmap_end_lsn: Lsn) -> bool {
let crnt = self.previous_heatmap.load();
match crnt.as_deref() {
Some(PreviousHeatmap::Active { end_lsn, .. }) => match end_lsn {
Some(crnt_end_lsn) => *crnt_end_lsn > new_heatmap_end_lsn,
None => true,
},
Some(PreviousHeatmap::Obsolete) => false,
None => false,
}
pub(super) fn is_previous_heatmap_active(&self) -> bool {
self.previous_heatmap
.load()
.as_ref()
.map(|prev| matches!(**prev, PreviousHeatmap::Active { .. }))
.unwrap_or(false)
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
@@ -3609,26 +3603,26 @@ impl Timeline {
// heatamp.
let previous_heatmap = self.previous_heatmap.load();
let visible_non_resident = match previous_heatmap.as_deref() {
Some(PreviousHeatmap::Active {
heatmap, read_at, ..
}) => Some(heatmap.layers.iter().filter_map(|hl| {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
Some(PreviousHeatmap::Active { heatmap, read_at }) => {
Some(heatmap.layers.iter().filter_map(|hl| {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
if layer.visibility() == LayerVisibilityHint::Covered {
return None;
}
if layer.visibility() == LayerVisibilityHint::Covered {
return None;
}
if layer.is_likely_resident() {
return None;
}
if layer.is_likely_resident() {
return None;
}
if layer.last_evicted_at().happened_after(*read_at) {
return None;
}
if layer.last_evicted_at().happened_after(*read_at) {
return None;
}
Some((desc, hl.metadata.clone(), hl.access_time))
})),
Some((desc, hl.metadata.clone(), hl.access_time))
}))
}
Some(PreviousHeatmap::Obsolete) => None,
None => None,
};
@@ -3715,7 +3709,6 @@ impl Timeline {
PreviousHeatmap::Active {
heatmap,
read_at: Instant::now(),
end_lsn: Some(end_lsn),
}
}
@@ -7053,7 +7046,6 @@ mod tests {
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
end_lsn: None,
})));
// Generate a new heatmap and assert that it contains the same layers as the old one.
@@ -7156,7 +7148,6 @@ mod tests {
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
end_lsn: None,
})));
// Evict all the layers in the previous heatmap

View File

@@ -32,7 +32,6 @@ impl HeatmapLayersDownloader {
fn new(
timeline: Arc<Timeline>,
concurrency: usize,
recurse: bool,
ctx: RequestContext,
) -> Result<HeatmapLayersDownloader, ApiError> {
let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
@@ -99,20 +98,6 @@ impl HeatmapLayersDownloader {
},
_ = cancel.cancelled() => {
tracing::info!("Heatmap layers download cancelled");
return;
}
}
if recurse {
if let Some(ancestor) = timeline.ancestor_timeline() {
let ctx = ctx.attached_child();
let res =
ancestor.start_heatmap_layers_download(concurrency, recurse, &ctx);
if let Err(err) = res {
tracing::info!(
"Failed to start heatmap layers download for ancestor: {err}"
);
}
}
}
}
@@ -155,20 +140,14 @@ impl HeatmapLayersDownloader {
}
impl Timeline {
pub(crate) fn start_heatmap_layers_download(
pub(crate) async fn start_heatmap_layers_download(
self: &Arc<Self>,
concurrency: usize,
recurse: bool,
ctx: &RequestContext,
) -> Result<(), ApiError> {
let mut locked = self.heatmap_layers_downloader.lock().unwrap();
if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
let dl = HeatmapLayersDownloader::new(
self.clone(),
concurrency,
recurse,
ctx.attached_child(),
)?;
let dl = HeatmapLayersDownloader::new(self.clone(), concurrency, ctx.attached_child())?;
*locked = Some(dl);
Ok(())
} else {

View File

@@ -524,10 +524,9 @@ async fn handle_tenant_timeline_download_heatmap_layers(
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false);
service
.tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse)
.tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
.await?;
json_response(StatusCode::OK, ())

View File

@@ -281,19 +281,13 @@ impl PageserverClient {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
concurrency: Option<usize>,
recurse: bool,
) -> Result<()> {
measured_request!(
"download_heatmap_layers",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner
.timeline_download_heatmap_layers(
tenant_shard_id,
timeline_id,
concurrency,
recurse
)
.timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
.await
)
}

View File

@@ -3774,7 +3774,6 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
concurrency: Option<usize>,
recurse: bool,
) -> Result<(), ApiError> {
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
@@ -3812,12 +3811,7 @@ impl Service {
targets,
|tenant_shard_id, client| async move {
client
.timeline_download_heatmap_layers(
tenant_shard_id,
timeline_id,
concurrency,
recurse,
)
.timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
.await
},
1,

View File

@@ -2469,21 +2469,12 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
return [TenantShardId.parse(tid) for tid in response.json()["updated"]]
def download_heatmap_layers(
self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, recurse: bool | None = None
):
url = (
f"{self.api}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers"
)
if recurse is not None:
url = url + f"?recurse={str(recurse).lower()}"
def download_heatmap_layers(self, tenant_shard_id: TenantShardId, timeline_id: TimelineId):
response = self.request(
"POST",
url,
f"{self.api}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
def __enter__(self) -> Self:

View File

@@ -938,12 +938,9 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
# Expect lots of layers
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
# Simulate large data by making layer downloads artifically slow
for ps in env.pageservers:
# Simulate large data by making layer downloads artifically slow
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
# Make the initial logical size calculation lie. Otherwise it on demand downloads
# layers and makes accounting difficult.
ps.http_client().configure_failpoints(("skip-logical-size-calculation", "return"))
def timeline_heatmap(tlid):
assert env.pageserver_remote_storage is not None
@@ -955,6 +952,21 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
raise RuntimeError(f"No heatmap for timeline: {tlid}")
# Upload a heatmap, so that secondaries have something to download
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
heatmap_before_migration = timeline_heatmap(timeline_id)
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
# However, it pulls the heatmap, which will be important later.
http_client = env.storage_controller.pageserver_api()
(status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
assert status == 202
assert progress["heatmap_mtime"] is not None
assert progress["layers_downloaded"] > 0
assert progress["bytes_downloaded"] > 0
assert progress["layers_total"] > progress["layers_downloaded"]
assert progress["bytes_total"] > progress["bytes_downloaded"]
env.storage_controller.allowed_errors.extend(
[
".*Timed out.*downloading layers.*",
@@ -963,7 +975,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
# Use a custom configuration that gives up earlier than usual.
# We can't hydrate everything anyway because of the failpoints.
# Implicitly, this also uploads a heatmap from the current attached location.
config = StorageControllerMigrationConfig(
secondary_warmup_timeout="5s", secondary_download_request_timeout="2s"
)
@@ -977,17 +988,22 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
heatmap_after_migration = timeline_heatmap(timeline_id)
local_layers = ps_secondary.list_layers(tenant_id, timeline_id)
# We download 1 layer per second and give up within 5 seconds.
assert len(local_layers) < 10
assert len(heatmap_before_migration["layers"]) > 0
after_migration_heatmap_layers_count = len(heatmap_after_migration["layers"])
assert len(heatmap_before_migration["layers"]) <= after_migration_heatmap_layers_count
log.info(f"Heatmap size after cold migration is {after_migration_heatmap_layers_count}")
env.storage_controller.download_heatmap_layers(
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
)
# Now simulate the case where a child timeline is archived, parent layers
# are evicted and the child is unarchived. When the child is unarchived,
# itself and the parent update their heatmaps to contain layers needed by the
# child. One can warm up the timeline hierarchy since the heatmaps are ready.
def all_layers_downloaded(expected_layer_count: int):
local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id))
@@ -995,9 +1011,8 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
assert local_layers_count >= expected_layer_count
wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count))
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
# Read everything and make sure that we're not downloading anything extra.
# All hot layers should be available locally now.
before = (
ps_secondary.http_client()
.get_metrics()
@@ -1015,11 +1030,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
workload.stop()
assert before == after
# Now simulate the case where a child timeline is archived, parent layers
# are evicted and the child is unarchived. When the child is unarchived,
# itself and the parent update their heatmaps to contain layers needed by the
# child. One can warm up the timeline hierarchy since the heatmaps are ready.
def check_archival_state(state: TimelineArchivalState, tline):
timelines = (
timeline["timeline_id"]
@@ -1054,6 +1064,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
assert expected_locally > 0
env.storage_controller.download_heatmap_layers(
TenantShardId(tenant_id, shard_number=0, shard_count=0), child_timeline_id, recurse=True
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
)
wait_until(lambda: all_layers_downloaded(expected_locally))