mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
pageserver: make heatmap generation additive (#10597)
## Problem Previously, when cutting over to cold secondary locations, we would clobber the previous, good, heatmap with a cold one. This is because heatmap generation used to include only resident layers. Once this merges, we can add an endpoint which triggers full heatmap hydration on attached locations to heal cold migrations. ## Summary of changes With this patch, heatmap generation becomes additive. If we have a heatmap from when this location was secondary, the new uploaded heatmap will be the result of a reconciliation between the old one and the on disk resident layers. More concretely, when we have the previous heatmap: 1. Filter the previous heatmap and keep layers that are (a) present in the current layer map, (b) visible, (c) not resident. Call this set of layers `visible_non_resident`. 2. From the layer map, select all layers that are resident and visible. Call this set of layers `resident`. 3. The new heatmap is the result of merging the two disjoint sets. Related https://github.com/neondatabase/neon/issues/10541
This commit is contained in:
@@ -838,7 +838,10 @@ impl StorageController {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
|
||||
Some(TenantShardMigrateRequest { node_id }),
|
||||
Some(TenantShardMigrateRequest {
|
||||
node_id,
|
||||
migration_config: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -609,7 +609,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
let req = TenantShardMigrateRequest {
|
||||
node_id: node,
|
||||
migration_config: None,
|
||||
};
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
@@ -623,7 +626,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
let req = TenantShardMigrateRequest {
|
||||
node_id: node,
|
||||
migration_config: None,
|
||||
};
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
@@ -1082,7 +1088,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
|
||||
Some(TenantShardMigrateRequest { node_id: mv.to }),
|
||||
Some(TenantShardMigrateRequest {
|
||||
node_id: mv.to,
|
||||
migration_config: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
|
||||
|
||||
@@ -182,6 +182,18 @@ pub struct TenantDescribeResponseShard {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateRequest {
|
||||
pub node_id: NodeId,
|
||||
#[serde(default)]
|
||||
pub migration_config: Option<MigrationConfig>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MigrationConfig {
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub secondary_warmup_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub secondary_download_request_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Debug)]
|
||||
|
||||
@@ -40,6 +40,8 @@ use remote_timeline_client::manifest::{
|
||||
use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use remote_timeline_client::FAILED_REMOTE_OP_RETRIES;
|
||||
use remote_timeline_client::FAILED_UPLOAD_WARN_THRESHOLD;
|
||||
use secondary::heatmap::HeatMapTenant;
|
||||
use secondary::heatmap::HeatMapTimeline;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
@@ -55,6 +57,7 @@ use timeline::offload::OffloadError;
|
||||
use timeline::CompactFlags;
|
||||
use timeline::CompactOptions;
|
||||
use timeline::CompactionError;
|
||||
use timeline::PreviousHeatmap;
|
||||
use timeline::ShutdownMode;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
@@ -262,6 +265,7 @@ struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
client: RemoteTimelineClient,
|
||||
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
}
|
||||
|
||||
pub(crate) struct TenantPreload {
|
||||
@@ -1128,6 +1132,7 @@ impl Tenant {
|
||||
resources: TimelineResources,
|
||||
mut index_part: IndexPart,
|
||||
metadata: TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
cause: LoadTimelineCause,
|
||||
ctx: &RequestContext,
|
||||
@@ -1158,6 +1163,7 @@ impl Tenant {
|
||||
let timeline = self.create_timeline_struct(
|
||||
timeline_id,
|
||||
&metadata,
|
||||
previous_heatmap,
|
||||
ancestor.clone(),
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
@@ -1557,8 +1563,18 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(vlad): Could go to S3 if the secondary is freezing cold and hasn't even
|
||||
// pulled the first heatmap. Not entirely necessary since the storage controller
|
||||
// will kick the secondary in any case and cause a download.
|
||||
let maybe_heatmap_at = self.read_on_disk_heatmap().await;
|
||||
|
||||
let timelines = self
|
||||
.load_timelines_metadata(remote_timeline_ids, remote_storage, cancel)
|
||||
.load_timelines_metadata(
|
||||
remote_timeline_ids,
|
||||
remote_storage,
|
||||
maybe_heatmap_at,
|
||||
cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(TenantPreload {
|
||||
@@ -1571,6 +1587,26 @@ impl Tenant {
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_on_disk_heatmap(&self) -> Option<(HeatMapTenant, std::time::Instant)> {
|
||||
let on_disk_heatmap_path = self.conf.tenant_heatmap_path(&self.tenant_shard_id);
|
||||
match tokio::fs::read_to_string(on_disk_heatmap_path).await {
|
||||
Ok(heatmap) => match serde_json::from_str::<HeatMapTenant>(&heatmap) {
|
||||
Ok(heatmap) => Some((heatmap, std::time::Instant::now())),
|
||||
Err(err) => {
|
||||
error!("Failed to deserialize old heatmap: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(err) => match err.kind() {
|
||||
std::io::ErrorKind::NotFound => None,
|
||||
_ => {
|
||||
error!("Unexpected IO error reading old heatmap: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
@@ -1658,7 +1694,10 @@ impl Tenant {
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
|
||||
remote_index_and_client.insert(timeline_id, (index_part, preload.client));
|
||||
remote_index_and_client.insert(
|
||||
timeline_id,
|
||||
(index_part, preload.client, preload.previous_heatmap),
|
||||
);
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
info!(
|
||||
@@ -1677,7 +1716,7 @@ impl Tenant {
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
let (index_part, remote_client, previous_heatmap) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
@@ -1697,6 +1736,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
previous_heatmap,
|
||||
self.get_timeline_resources_for(remote_client),
|
||||
LoadTimelineCause::Attach,
|
||||
ctx,
|
||||
@@ -1846,11 +1886,13 @@ impl Tenant {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn load_remote_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
resources: TimelineResources,
|
||||
cause: LoadTimelineCause,
|
||||
ctx: &RequestContext,
|
||||
@@ -1880,6 +1922,7 @@ impl Tenant {
|
||||
resources,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
previous_heatmap,
|
||||
ancestor,
|
||||
cause,
|
||||
ctx,
|
||||
@@ -1891,14 +1934,29 @@ impl Tenant {
|
||||
self: &Arc<Tenant>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
heatmap: Option<(HeatMapTenant, std::time::Instant)>,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
|
||||
let mut timeline_heatmaps = heatmap.map(|h| (h.0.into_timelines_index(), h.1));
|
||||
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in timeline_ids {
|
||||
let cancel_clone = cancel.clone();
|
||||
|
||||
let previous_timeline_heatmap = timeline_heatmaps.as_mut().and_then(|hs| {
|
||||
hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active {
|
||||
heatmap: h,
|
||||
read_at: hs.1,
|
||||
})
|
||||
});
|
||||
part_downloads.spawn(
|
||||
self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone)
|
||||
.instrument(info_span!("download_index_part", %timeline_id)),
|
||||
self.load_timeline_metadata(
|
||||
timeline_id,
|
||||
remote_storage.clone(),
|
||||
previous_timeline_heatmap,
|
||||
cancel_clone,
|
||||
)
|
||||
.instrument(info_span!("download_index_part", %timeline_id)),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1946,6 +2004,7 @@ impl Tenant {
|
||||
self: &Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
cancel: CancellationToken,
|
||||
) -> impl Future<Output = TimelinePreload> {
|
||||
let client = self.build_timeline_client(timeline_id, remote_storage);
|
||||
@@ -1961,6 +2020,7 @@ impl Tenant {
|
||||
client,
|
||||
timeline_id,
|
||||
index_part,
|
||||
previous_heatmap,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2072,7 +2132,12 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
let timeline_preload = self
|
||||
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
|
||||
.load_timeline_metadata(
|
||||
timeline_id,
|
||||
self.remote_storage.clone(),
|
||||
None,
|
||||
cancel.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let index_part = match timeline_preload.index_part {
|
||||
@@ -2106,6 +2171,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
None,
|
||||
timeline_resources,
|
||||
LoadTimelineCause::Unoffload,
|
||||
&ctx,
|
||||
@@ -2821,7 +2887,7 @@ impl Tenant {
|
||||
};
|
||||
let metadata = index_part.metadata.clone();
|
||||
self
|
||||
.load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{
|
||||
.load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{
|
||||
create_guard: timeline_create_guard, activate, }, &ctx)
|
||||
.await?
|
||||
.ready_to_activate()
|
||||
@@ -4030,6 +4096,7 @@ impl Tenant {
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
resources: TimelineResources,
|
||||
cause: CreateTimelineCause,
|
||||
@@ -4053,6 +4120,7 @@ impl Tenant {
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
new_metadata,
|
||||
previous_heatmap,
|
||||
ancestor,
|
||||
new_timeline_id,
|
||||
self.tenant_shard_id,
|
||||
@@ -5124,6 +5192,7 @@ impl Tenant {
|
||||
.create_timeline_struct(
|
||||
new_timeline_id,
|
||||
new_metadata,
|
||||
None,
|
||||
ancestor,
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::time::SystemTime;
|
||||
use std::{collections::HashMap, time::SystemTime};
|
||||
|
||||
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
|
||||
use utils::{generation::Generation, id::TimelineId};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(super) struct HeatMapTenant {
|
||||
pub(crate) struct HeatMapTenant {
|
||||
/// Generation of the attached location that uploaded the heatmap: this is not required
|
||||
/// for correctness, but acts as a hint to secondary locations in order to detect thrashing
|
||||
/// in the unlikely event that two attached locations are both uploading conflicting heatmaps.
|
||||
@@ -25,8 +25,17 @@ pub(super) struct HeatMapTenant {
|
||||
pub(super) upload_period_ms: Option<u128>,
|
||||
}
|
||||
|
||||
impl HeatMapTenant {
|
||||
pub(crate) fn into_timelines_index(self) -> HashMap<TimelineId, HeatMapTimeline> {
|
||||
self.timelines
|
||||
.into_iter()
|
||||
.map(|htl| (htl.timeline_id, htl))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub(crate) struct HeatMapTimeline {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
@@ -35,13 +44,13 @@ pub(crate) struct HeatMapTimeline {
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub(crate) struct HeatMapLayer {
|
||||
pub(crate) name: LayerName,
|
||||
pub(crate) metadata: LayerFileMetadata,
|
||||
|
||||
#[serde_as(as = "TimestampSeconds<i64>")]
|
||||
pub(super) access_time: SystemTime,
|
||||
pub(crate) access_time: SystemTime,
|
||||
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
}
|
||||
|
||||
@@ -136,6 +136,22 @@ pub(crate) fn local_layer_path(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum LastEviction {
|
||||
Never,
|
||||
At(std::time::Instant),
|
||||
Evicting,
|
||||
}
|
||||
|
||||
impl LastEviction {
|
||||
pub(crate) fn happened_after(&self, timepoint: std::time::Instant) -> bool {
|
||||
match self {
|
||||
LastEviction::Never => false,
|
||||
LastEviction::At(evicted_at) => evicted_at > &timepoint,
|
||||
LastEviction::Evicting => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
/// Creates a layer value for a file we know to not be resident.
|
||||
pub(crate) fn for_evicted(
|
||||
@@ -405,6 +421,17 @@ impl Layer {
|
||||
self.0.metadata()
|
||||
}
|
||||
|
||||
pub(crate) fn last_evicted_at(&self) -> LastEviction {
|
||||
match self.0.last_evicted_at.try_lock() {
|
||||
Ok(lock) => match *lock {
|
||||
None => LastEviction::Never,
|
||||
Some(at) => LastEviction::At(at),
|
||||
},
|
||||
Err(std::sync::TryLockError::WouldBlock) => LastEviction::Evicting,
|
||||
Err(std::sync::TryLockError::Poisoned(p)) => panic!("Lock poisoned: {p}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.0
|
||||
.timeline
|
||||
@@ -656,7 +683,9 @@ struct LayerInner {
|
||||
|
||||
/// When the Layer was last evicted but has not been downloaded since.
|
||||
///
|
||||
/// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
|
||||
/// This is used for skipping evicted layers from the previous heatmap (see
|
||||
/// `[Timeline::generate_heatmap]`) and for updating metrics
|
||||
/// (see [`LayerImplMetrics::redownload_after`]).
|
||||
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -150,16 +150,15 @@ use super::{
|
||||
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
|
||||
MaybeOffloaded,
|
||||
};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline,
|
||||
};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{
|
||||
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
|
||||
storage_layer::ReadableLayer,
|
||||
};
|
||||
use super::{
|
||||
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
|
||||
GcError,
|
||||
};
|
||||
use super::{secondary::heatmap::HeatMapLayer, GcError};
|
||||
|
||||
#[cfg(test)]
|
||||
use pageserver_api::value::Value;
|
||||
@@ -465,6 +464,16 @@ pub struct Timeline {
|
||||
|
||||
/// If Some, collects GetPage metadata for an ongoing PageTrace.
|
||||
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
|
||||
|
||||
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
Active {
|
||||
heatmap: HeatMapTimeline,
|
||||
read_at: std::time::Instant,
|
||||
},
|
||||
Obsolete,
|
||||
}
|
||||
|
||||
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
|
||||
@@ -2568,6 +2577,7 @@ impl Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
metadata: &TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -2730,6 +2740,8 @@ impl Timeline {
|
||||
create_idempotency,
|
||||
|
||||
page_trace: Default::default(),
|
||||
|
||||
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -3468,12 +3480,52 @@ impl Timeline {
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
// Firstly, if there's any heatmap left over from when this location
|
||||
// was a secondary, take that into account. Keep layers that are:
|
||||
// * present in the layer map
|
||||
// * visible
|
||||
// * non-resident
|
||||
// * not evicted since we read the heatmap
|
||||
//
|
||||
// Without this, a new cold, attached location would clobber the previous
|
||||
// heatamp.
|
||||
let previous_heatmap = self.previous_heatmap.load();
|
||||
let visible_non_resident = match previous_heatmap.as_deref() {
|
||||
Some(PreviousHeatmap::Active { heatmap, read_at }) => {
|
||||
Some(heatmap.layers.iter().filter_map(|hl| {
|
||||
let desc: PersistentLayerDesc = hl.name.clone().into();
|
||||
let layer = guard.try_get_from_key(&desc.key())?;
|
||||
|
||||
if layer.visibility() == LayerVisibilityHint::Covered {
|
||||
return None;
|
||||
}
|
||||
|
||||
if layer.is_likely_resident() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if layer.last_evicted_at().happened_after(*read_at) {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((desc, hl.metadata.clone(), hl.access_time))
|
||||
}))
|
||||
}
|
||||
Some(PreviousHeatmap::Obsolete) => None,
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Secondly, all currently visible, resident layers are included.
|
||||
let resident = guard.likely_resident_layers().filter_map(|layer| {
|
||||
match layer.visibility() {
|
||||
LayerVisibilityHint::Visible => {
|
||||
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
|
||||
let last_activity_ts = layer.latest_activity();
|
||||
Some((layer.layer_desc(), layer.metadata(), last_activity_ts))
|
||||
Some((
|
||||
layer.layer_desc().clone(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
))
|
||||
}
|
||||
LayerVisibilityHint::Covered => {
|
||||
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
|
||||
@@ -3482,7 +3534,18 @@ impl Timeline {
|
||||
}
|
||||
});
|
||||
|
||||
let mut layers = resident.collect::<Vec<_>>();
|
||||
let mut layers = match visible_non_resident {
|
||||
Some(non_resident) => {
|
||||
let mut non_resident = non_resident.peekable();
|
||||
if non_resident.peek().is_none() {
|
||||
self.previous_heatmap
|
||||
.store(Some(PreviousHeatmap::Obsolete.into()));
|
||||
}
|
||||
|
||||
non_resident.chain(resident).collect::<Vec<_>>()
|
||||
}
|
||||
None => resident.collect::<Vec<_>>(),
|
||||
};
|
||||
|
||||
// Sort layers in order of which to download first. For a large set of layers to download, we
|
||||
// want to prioritize those layers which are most likely to still be in the resident many minutes
|
||||
@@ -6661,18 +6724,32 @@ fn is_send() {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::Instrument;
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::tenant::{
|
||||
harness::{test_img, TenantHarness},
|
||||
layer_map::LayerMap,
|
||||
storage_layer::{Layer, LayerName},
|
||||
storage_layer::{Layer, LayerName, LayerVisibilityHint},
|
||||
timeline::{DeltaLayerTestDesc, EvictionError},
|
||||
Timeline,
|
||||
PreviousHeatmap, Timeline,
|
||||
};
|
||||
|
||||
use super::HeatMapTimeline;
|
||||
|
||||
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
|
||||
assert_eq!(lhs.layers.len(), rhs.layers.len());
|
||||
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
|
||||
for (l, r) in lhs_rhs {
|
||||
assert_eq!(l.name, r.name);
|
||||
assert_eq!(l.metadata, r.metadata);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_heatmap_generation() {
|
||||
let harness = TenantHarness::create("heatmap_generation").await.unwrap();
|
||||
@@ -6746,7 +6823,7 @@ mod tests {
|
||||
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
|
||||
|
||||
let mut last_lsn = Lsn::MAX;
|
||||
for layer in heatmap.layers {
|
||||
for layer in &heatmap.layers {
|
||||
// Covered layer should be omitted
|
||||
assert!(layer.name != covered_delta.layer_name());
|
||||
|
||||
@@ -6761,6 +6838,144 @@ mod tests {
|
||||
last_lsn = layer_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
// Evict all the layers and stash the old heatmap in the timeline.
|
||||
// This simulates a migration to a cold secondary location.
|
||||
|
||||
let guard = timeline.layers.read().await;
|
||||
let mut all_layers = Vec::new();
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
for layer in guard.likely_resident_layers() {
|
||||
all_layers.push(layer.clone());
|
||||
layer.evict_and_wait(forever).await.unwrap();
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
timeline
|
||||
.previous_heatmap
|
||||
.store(Some(Arc::new(PreviousHeatmap::Active {
|
||||
heatmap: heatmap.clone(),
|
||||
read_at: std::time::Instant::now(),
|
||||
})));
|
||||
|
||||
// Generate a new heatmap and assert that it contains the same layers as the old one.
|
||||
let post_migration_heatmap = timeline.generate_heatmap().await.unwrap();
|
||||
assert_heatmaps_have_same_layers(&heatmap, &post_migration_heatmap);
|
||||
|
||||
// Download each layer one by one. Generate the heatmap at each step and check
|
||||
// that it's stable.
|
||||
for layer in all_layers {
|
||||
if layer.visibility() == LayerVisibilityHint::Covered {
|
||||
continue;
|
||||
}
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident()
|
||||
.instrument(tracing::info_span!(
|
||||
parent: None,
|
||||
"download_layer",
|
||||
tenant_id = %timeline.tenant_shard_id.tenant_id,
|
||||
shard_id = %timeline.tenant_shard_id.shard_slug(),
|
||||
timeline_id = %timeline.timeline_id
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post_download_heatmap = timeline.generate_heatmap().await.unwrap();
|
||||
assert_heatmaps_have_same_layers(&heatmap, &post_download_heatmap);
|
||||
}
|
||||
|
||||
// Everything from the post-migration heatmap is now resident.
|
||||
// Check that we drop it from memory.
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_previous_heatmap_obsoletion() {
|
||||
let harness = TenantHarness::create("heatmap_previous_heatmap_obsoletion")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let l0_delta = DeltaLayerTestDesc::new(
|
||||
Lsn(0x20)..Lsn(0x30),
|
||||
Key::from_hex("000000000000000000000000000000000000").unwrap()
|
||||
..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(),
|
||||
vec![(
|
||||
Key::from_hex("720000000033333333444444445500000000").unwrap(),
|
||||
Lsn(0x25),
|
||||
Value::Image(test_img("foo")),
|
||||
)],
|
||||
);
|
||||
|
||||
let image_layer = (
|
||||
Lsn(0x40),
|
||||
vec![(
|
||||
Key::from_hex("620000000033333333444444445500000000").unwrap(),
|
||||
test_img("bar"),
|
||||
)],
|
||||
);
|
||||
|
||||
let delta_layers = vec![l0_delta];
|
||||
let image_layers = vec![image_layer];
|
||||
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TimelineId::generate(),
|
||||
Lsn(0x10),
|
||||
14,
|
||||
&ctx,
|
||||
delta_layers,
|
||||
image_layers,
|
||||
Lsn(0x100),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Layer visibility is an input to heatmap generation, so refresh it first
|
||||
timeline.update_layer_visibility().await.unwrap();
|
||||
|
||||
let heatmap = timeline
|
||||
.generate_heatmap()
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
// Both layers should be in the heatmap
|
||||
assert!(!heatmap.layers.is_empty());
|
||||
|
||||
// Now simulate a migration.
|
||||
timeline
|
||||
.previous_heatmap
|
||||
.store(Some(Arc::new(PreviousHeatmap::Active {
|
||||
heatmap: heatmap.clone(),
|
||||
read_at: std::time::Instant::now(),
|
||||
})));
|
||||
|
||||
// Evict all the layers in the previous heatmap
|
||||
let guard = timeline.layers.read().await;
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
for layer in guard.likely_resident_layers() {
|
||||
layer.evict_and_wait(forever).await.unwrap();
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
// Generate a new heatmap and check that the previous heatmap
|
||||
// has been marked obsolete.
|
||||
let post_eviction_heatmap = timeline
|
||||
.generate_heatmap()
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
assert!(post_eviction_heatmap.layers.is_empty());
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -294,6 +294,7 @@ impl DeleteTimelineFlow {
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
None, // Ancestor is not needed for deletion.
|
||||
None, // Previous heatmap is not needed for deletion
|
||||
tenant.get_timeline_resources_for(remote_client),
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
};
|
||||
@@ -162,6 +162,22 @@ impl ReconcilerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&MigrationConfig> for ReconcilerConfig {
|
||||
fn from(value: &MigrationConfig) -> Self {
|
||||
let mut builder = ReconcilerConfigBuilder::new();
|
||||
|
||||
if let Some(timeout) = value.secondary_warmup_timeout {
|
||||
builder = builder.secondary_warmup_timeout(timeout)
|
||||
}
|
||||
|
||||
if let Some(timeout) = value.secondary_download_request_timeout {
|
||||
builder = builder.secondary_download_request_timeout(timeout)
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
|
||||
pub(crate) struct ReconcileUnits {
|
||||
_sem_units: tokio::sync::OwnedSemaphorePermit,
|
||||
|
||||
@@ -5213,7 +5213,12 @@ impl Service {
|
||||
shard.sequence = shard.sequence.next();
|
||||
}
|
||||
|
||||
self.maybe_reconcile_shard(shard, nodes)
|
||||
let reconciler_config = match migrate_req.migration_config {
|
||||
Some(cfg) => (&cfg).into(),
|
||||
None => ReconcilerConfig::default(),
|
||||
};
|
||||
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
|
||||
};
|
||||
|
||||
if let Some(waiter) = waiter {
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import abc
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import dataclasses
|
||||
import filecmp
|
||||
import json
|
||||
import os
|
||||
@@ -1675,6 +1676,12 @@ class StorageControllerLeadershipStatus(StrEnum):
|
||||
CANDIDATE = "candidate"
|
||||
|
||||
|
||||
@dataclass
|
||||
class StorageControllerMigrationConfig:
|
||||
secondary_warmup_timeout: str | None
|
||||
secondary_download_request_timeout: str | None
|
||||
|
||||
|
||||
class NeonStorageController(MetricsGetter, LogUtils):
|
||||
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
|
||||
self.env = env
|
||||
@@ -2068,11 +2075,20 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
shards: list[TenantShardId] = body["new_shards"]
|
||||
return shards
|
||||
|
||||
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
|
||||
def tenant_shard_migrate(
|
||||
self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
dest_ps_id: int,
|
||||
config: StorageControllerMigrationConfig | None = None,
|
||||
):
|
||||
payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}
|
||||
if config is not None:
|
||||
payload["migration_config"] = dataclasses.asdict(config)
|
||||
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate",
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
|
||||
json=payload,
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
|
||||
|
||||
@@ -10,14 +10,18 @@ from typing import TYPE_CHECKING
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
StorageControllerMigrationConfig,
|
||||
)
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
|
||||
from fixtures.utils import skip_in_debug_build, wait_until
|
||||
from fixtures.utils import run_only_on_default_postgres, skip_in_debug_build, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -889,3 +893,93 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll
|
||||
assert progress_3["heatmap_mtime"] is not None
|
||||
assert progress_3["layers_total"] == progress_3["layers_downloaded"]
|
||||
assert progress_3["bytes_total"] == progress_3["bytes_downloaded"]
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@run_only_on_default_postgres("PG version is not interesting here")
|
||||
def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Attached":1}')
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
ps_attached = env.get_pageserver(attached_to_id)
|
||||
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
|
||||
|
||||
# Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis)
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.stop()
|
||||
|
||||
# Expect lots of layers
|
||||
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
|
||||
|
||||
# Simulate large data by making layer downloads artifically slow
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
|
||||
|
||||
# Upload a heatmap, so that secondaries have something to download
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
|
||||
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
|
||||
# However, it pulls the heatmap, which will be important later.
|
||||
http_client = env.storage_controller.pageserver_api()
|
||||
(status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
|
||||
assert status == 202
|
||||
assert progress["heatmap_mtime"] is not None
|
||||
assert progress["layers_downloaded"] > 0
|
||||
assert progress["bytes_downloaded"] > 0
|
||||
assert progress["layers_total"] > progress["layers_downloaded"]
|
||||
assert progress["bytes_total"] > progress["bytes_downloaded"]
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Timed out.*downloading layers.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Use a custom configuration that gives up earlier than usual.
|
||||
# We can't hydrate everything anyway because of the failpoints.
|
||||
config = StorageControllerMigrationConfig(
|
||||
secondary_warmup_timeout="5s", secondary_download_request_timeout="2s"
|
||||
)
|
||||
env.storage_controller.tenant_shard_migrate(
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id
|
||||
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0
|
||||
|
||||
# The new layer map should contain all the layers in the pre-migration one
|
||||
# and a new in memory layer
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) + 1 == len(
|
||||
heatmap_after_migration["timelines"][0]["layers"]
|
||||
)
|
||||
|
||||
log.info(
|
||||
f'Heatmap size after cold migration is {len(heatmap_after_migration["timelines"][0]["layers"])}'
|
||||
)
|
||||
|
||||
# TODO: Once we have an endpoint for rescuing the cold location, exercise it here.
|
||||
|
||||
Reference in New Issue
Block a user