mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
storage: add APIs for warming up location after cold migrations (#10788)
## Problem We lack an API for warming up attached locations based on the heatmap contents. This is problematic in two places: 1. If we manually migrate and cut over while the secondary is still cold 2. When we re-attach a previously offloaded tenant ## Summary of changes https://github.com/neondatabase/neon/pull/10597 made heatmap generation additive across migrations, so we won't clobber it a after a cold migration. This allows us to implement: 1. An endpoint for downloading all missing heatmap layers on the pageserver: `/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers`. Only one such operation per timeline is allowed at any given time. The granularity is tenant shard. 2. An endpoint to the storage controller to trigger the downloads on the pageserver: `/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers`. This works both at tenant and tenant shard level. If an unsharded tenant id is provided, the operation is started on all shards, otherwise only the specified shard. 3. A storcon cli command. Again, tenant and tenant-shard level granularities are supported. Cplane will call into storcon and trigger the downloads for all shards. When we want to rescue a migration, we will use storcon cli targeting the specific tenant shard. Related: https://github.com/neondatabase/neon/issues/10541
This commit is contained in:
@@ -22,7 +22,7 @@ use pageserver_api::{
|
||||
};
|
||||
use pageserver_client::mgmt_api::{self};
|
||||
use reqwest::{Method, StatusCode, Url};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
use pageserver_api::controller_api::{
|
||||
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
@@ -239,6 +239,19 @@ enum Command {
|
||||
#[arg(long)]
|
||||
scheduling_policy: SkSchedulingPolicyArg,
|
||||
},
|
||||
/// Downloads any missing heatmap layers for all shard for a given timeline
|
||||
DownloadHeatmapLayers {
|
||||
/// Tenant ID or tenant shard ID. When an unsharded tenant ID is specified,
|
||||
/// the operation is performed on all shards. When a sharded tenant ID is
|
||||
/// specified, the operation is only performed on the specified shard.
|
||||
#[arg(long)]
|
||||
tenant_shard_id: TenantShardId,
|
||||
#[arg(long)]
|
||||
timeline_id: TimelineId,
|
||||
/// Optional: Maximum download concurrency (default is 16)
|
||||
#[arg(long)]
|
||||
concurrency: Option<usize>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -1247,6 +1260,24 @@ async fn main() -> anyhow::Result<()> {
|
||||
String::from(scheduling_policy)
|
||||
);
|
||||
}
|
||||
Command::DownloadHeatmapLayers {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
concurrency,
|
||||
} => {
|
||||
let mut path = format!(
|
||||
"/v1/tenant/{}/timeline/{}/download_heatmap_layers",
|
||||
tenant_shard_id, timeline_id,
|
||||
);
|
||||
|
||||
if let Some(c) = concurrency {
|
||||
path = format!("{path}?concurrency={c}");
|
||||
}
|
||||
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(Method::POST, path, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -117,6 +117,10 @@ impl TenantShardId {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn range(&self) -> RangeInclusive<Self> {
|
||||
RangeInclusive::new(*self, *self)
|
||||
}
|
||||
|
||||
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
|
||||
ShardSlug(self)
|
||||
}
|
||||
|
||||
@@ -477,6 +477,26 @@ impl Client {
|
||||
self.request(Method::POST, &uri, ()).await.map(|_| ())
|
||||
}
|
||||
|
||||
pub async fn timeline_download_heatmap_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
concurrency: Option<usize>,
|
||||
) -> Result<()> {
|
||||
let mut path = reqwest::Url::parse(&format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
|
||||
self.mgmt_api_endpoint, tenant_shard_id, timeline_id
|
||||
))
|
||||
.expect("Cannot build URL");
|
||||
|
||||
if let Some(concurrency) = concurrency {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("concurrency", &format!("{}", concurrency));
|
||||
}
|
||||
|
||||
self.request(Method::POST, path, ()).await.map(|_| ())
|
||||
}
|
||||
|
||||
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/reset",
|
||||
|
||||
@@ -824,6 +824,38 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/TenantConfigResponse"
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: concurrency
|
||||
description: Maximum number of concurrent downloads (capped at remote storage concurrency)
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: integer
|
||||
post:
|
||||
description: |
|
||||
Download all layers in the specified timeline's heatmap. The `tenant_shard_id` parameter
|
||||
may be used to target all shards of a tenant when the unsharded form is used, or a specific
|
||||
tenant shard with the sharded form.
|
||||
responses:
|
||||
"200":
|
||||
description: Success
|
||||
delete:
|
||||
description: Stop any on-going background downloads of heatmap layers for the specified timeline.
|
||||
responses:
|
||||
"200":
|
||||
description: Success
|
||||
|
||||
/v1/utilization:
|
||||
get:
|
||||
description: |
|
||||
|
||||
@@ -1463,6 +1463,59 @@ async fn timeline_layer_scan_disposable_keys(
|
||||
)
|
||||
}
|
||||
|
||||
async fn timeline_download_heatmap_layers_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
// Only used in the case where remote storage is not configured.
|
||||
const DEFAULT_MAX_CONCURRENCY: usize = 100;
|
||||
// A conservative default.
|
||||
const DEFAULT_CONCURRENCY: usize = 16;
|
||||
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
let desired_concurrency =
|
||||
parse_query_param(&request, "concurrency")?.unwrap_or(DEFAULT_CONCURRENCY);
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let max_concurrency = get_config(&request)
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.map(|c| c.concurrency_limit())
|
||||
.unwrap_or(DEFAULT_MAX_CONCURRENCY);
|
||||
let concurrency = std::cmp::min(max_concurrency, desired_concurrency);
|
||||
|
||||
timeline.start_heatmap_layers_download(concurrency).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn timeline_shutdown_download_heatmap_layers_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
timeline.stop_and_drain_heatmap_layers_download().await;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn layer_download_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3626,6 +3679,14 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
|
||||
|r| api_handler(r, layer_map_info_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|
||||
|r| api_handler(r, timeline_download_heatmap_layers_handler),
|
||||
)
|
||||
.delete(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|
||||
|r| api_handler(r, timeline_shutdown_download_heatmap_layers_handler),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|
||||
|r| api_handler(r, layer_download_handler),
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod delete;
|
||||
pub(crate) mod detach_ancestor;
|
||||
mod eviction_task;
|
||||
pub(crate) mod handle;
|
||||
mod heatmap_layers_downloader;
|
||||
pub(crate) mod import_pgdata;
|
||||
mod init;
|
||||
pub mod layer_manager;
|
||||
@@ -467,6 +468,10 @@ pub struct Timeline {
|
||||
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
|
||||
|
||||
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
|
||||
|
||||
/// May host a background Tokio task which downloads all the layers from the current
|
||||
/// heatmap on demand.
|
||||
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -2039,6 +2044,11 @@ impl Timeline {
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// If we have a background task downloading heatmap layers stop it.
|
||||
// The background downloads are sensitive to timeline cancellation (done above),
|
||||
// so the drain will be immediate.
|
||||
self.stop_and_drain_heatmap_layers_download().await;
|
||||
|
||||
// Ensure Prevent new page service requests from starting.
|
||||
self.handles.shutdown();
|
||||
|
||||
@@ -2752,6 +2762,8 @@ impl Timeline {
|
||||
page_trace: Default::default(),
|
||||
|
||||
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
|
||||
|
||||
heatmap_layers_downloader: Mutex::new(None),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
|
||||
162
pageserver/src/tenant/timeline/heatmap_layers_downloader.rs
Normal file
162
pageserver/src/tenant/timeline/heatmap_layers_downloader.rs
Normal file
@@ -0,0 +1,162 @@
|
||||
//! Timeline utility module to hydrate everything from the current heatmap.
|
||||
//!
|
||||
//! Provides utilities to spawn and abort a background task where the downloads happen.
|
||||
//! See /v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers.
|
||||
|
||||
use futures::StreamExt;
|
||||
use http_utils::error::ApiError;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use super::Timeline;
|
||||
|
||||
// This status is not strictly necessary now, but gives us a nice place
|
||||
// to store progress information if we ever wish to expose it.
|
||||
pub(super) enum HeatmapLayersDownloadStatus {
|
||||
InProgress,
|
||||
Complete,
|
||||
}
|
||||
|
||||
pub(super) struct HeatmapLayersDownloader {
|
||||
handle: tokio::task::JoinHandle<()>,
|
||||
status: Arc<Mutex<HeatmapLayersDownloadStatus>>,
|
||||
cancel: CancellationToken,
|
||||
downloads_guard: Arc<Gate>,
|
||||
}
|
||||
|
||||
impl HeatmapLayersDownloader {
|
||||
fn new(
|
||||
timeline: Arc<Timeline>,
|
||||
concurrency: usize,
|
||||
) -> Result<HeatmapLayersDownloader, ApiError> {
|
||||
let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
|
||||
|
||||
let cancel = timeline.cancel.child_token();
|
||||
let downloads_guard = Arc::new(Gate::default());
|
||||
|
||||
let status = Arc::new(Mutex::new(HeatmapLayersDownloadStatus::InProgress));
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let status = status.clone();
|
||||
let downloads_guard = downloads_guard.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
let _guard = tl_guard;
|
||||
|
||||
scopeguard::defer! {
|
||||
*status.lock().unwrap() = HeatmapLayersDownloadStatus::Complete;
|
||||
}
|
||||
|
||||
let Some(heatmap) = timeline.generate_heatmap().await else {
|
||||
tracing::info!("Heatmap layers download failed to generate heatmap");
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
heatmap_layers=%heatmap.layers.len(),
|
||||
"Starting heatmap layers download"
|
||||
);
|
||||
|
||||
let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
|
||||
|layer| {
|
||||
let tl = timeline.clone();
|
||||
let dl_guard = match downloads_guard.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
// [`Self::shutdown`] was called. Don't spawn any more downloads.
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some(async move {
|
||||
let _dl_guard = dl_guard;
|
||||
|
||||
let res = tl.download_layer(&layer.name).await;
|
||||
if let Err(err) = res {
|
||||
if !err.is_cancelled() {
|
||||
tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
)).buffered(concurrency);
|
||||
|
||||
tokio::select! {
|
||||
_ = stream.collect::<()>() => {
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
"Heatmap layers download completed"
|
||||
);
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Heatmap layers download cancelled");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
status,
|
||||
handle,
|
||||
cancel,
|
||||
downloads_guard,
|
||||
})
|
||||
}
|
||||
|
||||
fn is_complete(&self) -> bool {
|
||||
matches!(
|
||||
*self.status.lock().unwrap(),
|
||||
HeatmapLayersDownloadStatus::Complete
|
||||
)
|
||||
}
|
||||
|
||||
/// Drive any in-progress downloads to completion and stop spawning any new ones.
|
||||
///
|
||||
/// This has two callers and they behave differently
|
||||
/// 1. [`Timeline::shutdown`]: the drain will be immediate since downloads themselves
|
||||
/// are sensitive to timeline cancellation.
|
||||
///
|
||||
/// 2. Endpoint handler in [`crate::http::routes`]: the drain will wait for any in-progress
|
||||
/// downloads to complete.
|
||||
async fn stop_and_drain(self) {
|
||||
// Counterintuitive: close the guard before cancelling.
|
||||
// Something needs to poll the already created download futures to completion.
|
||||
// If we cancel first, then the underlying task exits and we lost
|
||||
// the poller.
|
||||
self.downloads_guard.close().await;
|
||||
self.cancel.cancel();
|
||||
if let Err(err) = self.handle.await {
|
||||
tracing::warn!("Failed to join heatmap layer downloader task: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) async fn start_heatmap_layers_download(
|
||||
self: &Arc<Self>,
|
||||
concurrency: usize,
|
||||
) -> Result<(), ApiError> {
|
||||
let mut locked = self.heatmap_layers_downloader.lock().unwrap();
|
||||
if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
|
||||
let dl = HeatmapLayersDownloader::new(self.clone(), concurrency)?;
|
||||
*locked = Some(dl);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::Conflict("Already running".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn stop_and_drain_heatmap_layers_download(&self) {
|
||||
// This can race with the start of a new downloader and lead to a situation
|
||||
// where one donloader is shutting down and another one is in-flight.
|
||||
// The only impact is that we'd end up using more remote storage semaphore
|
||||
// units than expected.
|
||||
let downloader = self.heatmap_layers_downloader.lock().unwrap().take();
|
||||
if let Some(dl) = downloader {
|
||||
dl.stop_and_drain().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -516,6 +516,24 @@ async fn handle_tenant_timeline_block_unblock_gc(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_download_heatmap_layers(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
|
||||
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
|
||||
|
||||
service
|
||||
.tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
|
||||
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
|
||||
// compare to, so we can just filter out our well known ID format with regexes.
|
||||
@@ -2078,6 +2096,16 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_download_heatmap_layers,
|
||||
RequestName("v1_tenant_timeline_download_heatmap_layers"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// Tenant detail GET passthrough to shard zero:
|
||||
.get("/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -280,6 +280,22 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn timeline_download_heatmap_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
concurrency: Option<usize>,
|
||||
) -> Result<()> {
|
||||
measured_request!(
|
||||
"download_heatmap_layers",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
|
||||
measured_request!(
|
||||
"utilization",
|
||||
|
||||
@@ -162,6 +162,7 @@ enum TenantOperations {
|
||||
TimelineDetachAncestor,
|
||||
TimelineGcBlockUnblock,
|
||||
DropDetached,
|
||||
DownloadHeatmapLayers,
|
||||
}
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
@@ -3757,6 +3758,61 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_download_heatmap_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
concurrency: Option<usize>,
|
||||
) -> Result<(), ApiError> {
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_shard_id.tenant_id,
|
||||
TenantOperations::DownloadHeatmapLayers,
|
||||
)
|
||||
.await;
|
||||
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
// If the request got an unsharded tenant id, then apply
|
||||
// the operation to all shards. Otherwise, apply it to a specific shard.
|
||||
let shards_range = if tenant_shard_id.is_unsharded() {
|
||||
TenantShardId::tenant_range(tenant_shard_id.tenant_id)
|
||||
} else {
|
||||
tenant_shard_id.range()
|
||||
};
|
||||
|
||||
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
}
|
||||
}
|
||||
targets
|
||||
};
|
||||
|
||||
self.tenant_for_shards_api(
|
||||
targets,
|
||||
|tenant_shard_id, client| async move {
|
||||
client
|
||||
.timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
|
||||
.await
|
||||
},
|
||||
1,
|
||||
1,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
|
||||
///
|
||||
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
|
||||
|
||||
@@ -2467,6 +2467,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response.raise_for_status()
|
||||
return [TenantShardId.parse(tid) for tid in response.json()["updated"]]
|
||||
|
||||
def download_heatmap_layers(self, tenant_shard_id: TenantShardId, timeline_id: TimelineId):
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.api}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
return self
|
||||
|
||||
|
||||
@@ -974,12 +974,22 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# The new layer map should contain all the layers in the pre-migration one
|
||||
# and a new in memory layer
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) + 1 == len(
|
||||
heatmap_after_migration["timelines"][0]["layers"]
|
||||
after_migration_heatmap_layers_count = len(heatmap_after_migration["timelines"][0]["layers"])
|
||||
assert (
|
||||
len(heatmap_before_migration["timelines"][0]["layers"]) + 1
|
||||
== after_migration_heatmap_layers_count
|
||||
)
|
||||
|
||||
log.info(
|
||||
f'Heatmap size after cold migration is {len(heatmap_after_migration["timelines"][0]["layers"])}'
|
||||
log.info(f"Heatmap size after cold migration is {after_migration_heatmap_layers_count}")
|
||||
|
||||
env.storage_controller.download_heatmap_layers(
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
|
||||
)
|
||||
|
||||
# TODO: Once we have an endpoint for rescuing the cold location, exercise it here.
|
||||
def all_layers_downloaded():
|
||||
local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id))
|
||||
|
||||
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
|
||||
assert local_layers_count == after_migration_heatmap_layers_count
|
||||
|
||||
wait_until(all_layers_downloaded)
|
||||
|
||||
Reference in New Issue
Block a user