diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 9a2d30c861..0fadb9c5fe 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -838,7 +838,10 @@ impl StorageController { self.dispatch( Method::PUT, format!("control/v1/tenant/{tenant_shard_id}/migrate"), - Some(TenantShardMigrateRequest { node_id }), + Some(TenantShardMigrateRequest { + node_id, + migration_config: None, + }), ) .await } diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 985fe6b3b1..83faf6b4af 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -609,7 +609,10 @@ async fn main() -> anyhow::Result<()> { tenant_shard_id, node, } => { - let req = TenantShardMigrateRequest { node_id: node }; + let req = TenantShardMigrateRequest { + node_id: node, + migration_config: None, + }; storcon_client .dispatch::( @@ -623,7 +626,10 @@ async fn main() -> anyhow::Result<()> { tenant_shard_id, node, } => { - let req = TenantShardMigrateRequest { node_id: node }; + let req = TenantShardMigrateRequest { + node_id: node, + migration_config: None, + }; storcon_client .dispatch::( @@ -1082,7 +1088,10 @@ async fn main() -> anyhow::Result<()> { .dispatch::( Method::PUT, format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id), - Some(TenantShardMigrateRequest { node_id: mv.to }), + Some(TenantShardMigrateRequest { + node_id: mv.to, + migration_config: None, + }), ) .await .map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e)) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 78e080981a..42f6e47e63 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -182,6 +182,18 @@ pub struct TenantDescribeResponseShard { #[derive(Serialize, Deserialize, Debug)] pub struct TenantShardMigrateRequest { pub node_id: NodeId, + #[serde(default)] + pub migration_config: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MigrationConfig { + #[serde(default)] + #[serde(with = "humantime_serde")] + pub secondary_warmup_timeout: Option, + #[serde(default)] + #[serde(with = "humantime_serde")] + pub secondary_download_request_timeout: Option, } #[derive(Serialize, Clone, Debug)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 605bfac2b3..dec585ff65 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -40,6 +40,8 @@ use remote_timeline_client::manifest::{ use remote_timeline_client::UploadQueueNotReadyError; use remote_timeline_client::FAILED_REMOTE_OP_RETRIES; use remote_timeline_client::FAILED_UPLOAD_WARN_THRESHOLD; +use secondary::heatmap::HeatMapTenant; +use secondary::heatmap::HeatMapTimeline; use std::collections::BTreeMap; use std::fmt; use std::future::Future; @@ -55,6 +57,7 @@ use timeline::offload::OffloadError; use timeline::CompactFlags; use timeline::CompactOptions; use timeline::CompactionError; +use timeline::PreviousHeatmap; use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; @@ -262,6 +265,7 @@ struct TimelinePreload { timeline_id: TimelineId, client: RemoteTimelineClient, index_part: Result, + previous_heatmap: Option, } pub(crate) struct TenantPreload { @@ -1128,6 +1132,7 @@ impl Tenant { resources: TimelineResources, mut index_part: IndexPart, metadata: TimelineMetadata, + previous_heatmap: Option, ancestor: Option>, cause: LoadTimelineCause, ctx: &RequestContext, @@ -1158,6 +1163,7 @@ impl Tenant { let timeline = self.create_timeline_struct( timeline_id, &metadata, + previous_heatmap, ancestor.clone(), resources, CreateTimelineCause::Load, @@ -1557,8 +1563,18 @@ impl Tenant { } } + // TODO(vlad): Could go to S3 if the secondary is freezing cold and hasn't even + // pulled the first heatmap. Not entirely necessary since the storage controller + // will kick the secondary in any case and cause a download. + let maybe_heatmap_at = self.read_on_disk_heatmap().await; + let timelines = self - .load_timelines_metadata(remote_timeline_ids, remote_storage, cancel) + .load_timelines_metadata( + remote_timeline_ids, + remote_storage, + maybe_heatmap_at, + cancel, + ) .await?; Ok(TenantPreload { @@ -1571,6 +1587,26 @@ impl Tenant { }) } + async fn read_on_disk_heatmap(&self) -> Option<(HeatMapTenant, std::time::Instant)> { + let on_disk_heatmap_path = self.conf.tenant_heatmap_path(&self.tenant_shard_id); + match tokio::fs::read_to_string(on_disk_heatmap_path).await { + Ok(heatmap) => match serde_json::from_str::(&heatmap) { + Ok(heatmap) => Some((heatmap, std::time::Instant::now())), + Err(err) => { + error!("Failed to deserialize old heatmap: {err}"); + None + } + }, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => None, + _ => { + error!("Unexpected IO error reading old heatmap: {err}"); + None + } + }, + } + } + /// /// Background task that downloads all data for a tenant and brings it to Active state. /// @@ -1658,7 +1694,10 @@ impl Tenant { match index_part { MaybeDeletedIndexPart::IndexPart(index_part) => { timeline_ancestors.insert(timeline_id, index_part.metadata.clone()); - remote_index_and_client.insert(timeline_id, (index_part, preload.client)); + remote_index_and_client.insert( + timeline_id, + (index_part, preload.client, preload.previous_heatmap), + ); } MaybeDeletedIndexPart::Deleted(index_part) => { info!( @@ -1677,7 +1716,7 @@ impl Tenant { // layer file. let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?; for (timeline_id, remote_metadata) in sorted_timelines { - let (index_part, remote_client) = remote_index_and_client + let (index_part, remote_client, previous_heatmap) = remote_index_and_client .remove(&timeline_id) .expect("just put it in above"); @@ -1697,6 +1736,7 @@ impl Tenant { timeline_id, index_part, remote_metadata, + previous_heatmap, self.get_timeline_resources_for(remote_client), LoadTimelineCause::Attach, ctx, @@ -1846,11 +1886,13 @@ impl Tenant { } #[instrument(skip_all, fields(timeline_id=%timeline_id))] + #[allow(clippy::too_many_arguments)] async fn load_remote_timeline( self: &Arc, timeline_id: TimelineId, index_part: IndexPart, remote_metadata: TimelineMetadata, + previous_heatmap: Option, resources: TimelineResources, cause: LoadTimelineCause, ctx: &RequestContext, @@ -1880,6 +1922,7 @@ impl Tenant { resources, index_part, remote_metadata, + previous_heatmap, ancestor, cause, ctx, @@ -1891,14 +1934,29 @@ impl Tenant { self: &Arc, timeline_ids: HashSet, remote_storage: &GenericRemoteStorage, + heatmap: Option<(HeatMapTenant, std::time::Instant)>, cancel: CancellationToken, ) -> anyhow::Result> { + let mut timeline_heatmaps = heatmap.map(|h| (h.0.into_timelines_index(), h.1)); + let mut part_downloads = JoinSet::new(); for timeline_id in timeline_ids { let cancel_clone = cancel.clone(); + + let previous_timeline_heatmap = timeline_heatmaps.as_mut().and_then(|hs| { + hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active { + heatmap: h, + read_at: hs.1, + }) + }); part_downloads.spawn( - self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone) - .instrument(info_span!("download_index_part", %timeline_id)), + self.load_timeline_metadata( + timeline_id, + remote_storage.clone(), + previous_timeline_heatmap, + cancel_clone, + ) + .instrument(info_span!("download_index_part", %timeline_id)), ); } @@ -1946,6 +2004,7 @@ impl Tenant { self: &Arc, timeline_id: TimelineId, remote_storage: GenericRemoteStorage, + previous_heatmap: Option, cancel: CancellationToken, ) -> impl Future { let client = self.build_timeline_client(timeline_id, remote_storage); @@ -1961,6 +2020,7 @@ impl Tenant { client, timeline_id, index_part, + previous_heatmap, } } } @@ -2072,7 +2132,12 @@ impl Tenant { })?; let timeline_preload = self - .load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone()) + .load_timeline_metadata( + timeline_id, + self.remote_storage.clone(), + None, + cancel.clone(), + ) .await; let index_part = match timeline_preload.index_part { @@ -2106,6 +2171,7 @@ impl Tenant { timeline_id, index_part, remote_metadata, + None, timeline_resources, LoadTimelineCause::Unoffload, &ctx, @@ -2821,7 +2887,7 @@ impl Tenant { }; let metadata = index_part.metadata.clone(); self - .load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{ + .load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{ create_guard: timeline_create_guard, activate, }, &ctx) .await? .ready_to_activate() @@ -4030,6 +4096,7 @@ impl Tenant { &self, new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, + previous_heatmap: Option, ancestor: Option>, resources: TimelineResources, cause: CreateTimelineCause, @@ -4053,6 +4120,7 @@ impl Tenant { self.conf, Arc::clone(&self.tenant_conf), new_metadata, + previous_heatmap, ancestor, new_timeline_id, self.tenant_shard_id, @@ -5124,6 +5192,7 @@ impl Tenant { .create_timeline_struct( new_timeline_id, new_metadata, + None, ancestor, resources, CreateTimelineCause::Load, diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs index 4a8e66d38a..0fa10ca294 100644 --- a/pageserver/src/tenant/secondary/heatmap.rs +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -1,4 +1,4 @@ -use std::time::SystemTime; +use std::{collections::HashMap, time::SystemTime}; use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName}; @@ -8,7 +8,7 @@ use serde_with::{serde_as, DisplayFromStr, TimestampSeconds}; use utils::{generation::Generation, id::TimelineId}; #[derive(Serialize, Deserialize)] -pub(super) struct HeatMapTenant { +pub(crate) struct HeatMapTenant { /// Generation of the attached location that uploaded the heatmap: this is not required /// for correctness, but acts as a hint to secondary locations in order to detect thrashing /// in the unlikely event that two attached locations are both uploading conflicting heatmaps. @@ -25,8 +25,17 @@ pub(super) struct HeatMapTenant { pub(super) upload_period_ms: Option, } +impl HeatMapTenant { + pub(crate) fn into_timelines_index(self) -> HashMap { + self.timelines + .into_iter() + .map(|htl| (htl.timeline_id, htl)) + .collect() + } +} + #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub(crate) struct HeatMapTimeline { #[serde_as(as = "DisplayFromStr")] pub(crate) timeline_id: TimelineId, @@ -35,13 +44,13 @@ pub(crate) struct HeatMapTimeline { } #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub(crate) struct HeatMapLayer { pub(crate) name: LayerName, pub(crate) metadata: LayerFileMetadata, #[serde_as(as = "TimestampSeconds")] - pub(super) access_time: SystemTime, + pub(crate) access_time: SystemTime, // TODO: an actual 'heat' score that would let secondary locations prioritize downloading // the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary. } diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 40282defd4..0bf606cf0a 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -136,6 +136,22 @@ pub(crate) fn local_layer_path( } } +pub(crate) enum LastEviction { + Never, + At(std::time::Instant), + Evicting, +} + +impl LastEviction { + pub(crate) fn happened_after(&self, timepoint: std::time::Instant) -> bool { + match self { + LastEviction::Never => false, + LastEviction::At(evicted_at) => evicted_at > &timepoint, + LastEviction::Evicting => true, + } + } +} + impl Layer { /// Creates a layer value for a file we know to not be resident. pub(crate) fn for_evicted( @@ -405,6 +421,17 @@ impl Layer { self.0.metadata() } + pub(crate) fn last_evicted_at(&self) -> LastEviction { + match self.0.last_evicted_at.try_lock() { + Ok(lock) => match *lock { + None => LastEviction::Never, + Some(at) => LastEviction::At(at), + }, + Err(std::sync::TryLockError::WouldBlock) => LastEviction::Evicting, + Err(std::sync::TryLockError::Poisoned(p)) => panic!("Lock poisoned: {p}"), + } + } + pub(crate) fn get_timeline_id(&self) -> Option { self.0 .timeline @@ -656,7 +683,9 @@ struct LayerInner { /// When the Layer was last evicted but has not been downloaded since. /// - /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`]. + /// This is used for skipping evicted layers from the previous heatmap (see + /// `[Timeline::generate_heatmap]`) and for updating metrics + /// (see [`LayerImplMetrics::redownload_after`]). last_evicted_at: std::sync::Mutex>, #[cfg(test)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b211af4eff..782b7d88b0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -150,16 +150,15 @@ use super::{ config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized, MaybeOffloaded, }; -use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; +use super::{ + debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline, +}; use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; use super::{ remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError, storage_layer::ReadableLayer, }; -use super::{ - secondary::heatmap::{HeatMapLayer, HeatMapTimeline}, - GcError, -}; +use super::{secondary::heatmap::HeatMapLayer, GcError}; #[cfg(test)] use pageserver_api::value::Value; @@ -465,6 +464,16 @@ pub struct Timeline { /// If Some, collects GetPage metadata for an ongoing PageTrace. pub(crate) page_trace: ArcSwapOption>, + + previous_heatmap: ArcSwapOption, +} + +pub(crate) enum PreviousHeatmap { + Active { + heatmap: HeatMapTimeline, + read_at: std::time::Instant, + }, + Obsolete, } pub type TimelineDeleteProgress = Arc>; @@ -2568,6 +2577,7 @@ impl Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, metadata: &TimelineMetadata, + previous_heatmap: Option, ancestor: Option>, timeline_id: TimelineId, tenant_shard_id: TenantShardId, @@ -2730,6 +2740,8 @@ impl Timeline { create_idempotency, page_trace: Default::default(), + + previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap), }; result.repartition_threshold = @@ -3468,12 +3480,52 @@ impl Timeline { let guard = self.layers.read().await; + // Firstly, if there's any heatmap left over from when this location + // was a secondary, take that into account. Keep layers that are: + // * present in the layer map + // * visible + // * non-resident + // * not evicted since we read the heatmap + // + // Without this, a new cold, attached location would clobber the previous + // heatamp. + let previous_heatmap = self.previous_heatmap.load(); + let visible_non_resident = match previous_heatmap.as_deref() { + Some(PreviousHeatmap::Active { heatmap, read_at }) => { + Some(heatmap.layers.iter().filter_map(|hl| { + let desc: PersistentLayerDesc = hl.name.clone().into(); + let layer = guard.try_get_from_key(&desc.key())?; + + if layer.visibility() == LayerVisibilityHint::Covered { + return None; + } + + if layer.is_likely_resident() { + return None; + } + + if layer.last_evicted_at().happened_after(*read_at) { + return None; + } + + Some((desc, hl.metadata.clone(), hl.access_time)) + })) + } + Some(PreviousHeatmap::Obsolete) => None, + None => None, + }; + + // Secondly, all currently visible, resident layers are included. let resident = guard.likely_resident_layers().filter_map(|layer| { match layer.visibility() { LayerVisibilityHint::Visible => { // Layer is visible to one or more read LSNs: elegible for inclusion in layer map let last_activity_ts = layer.latest_activity(); - Some((layer.layer_desc(), layer.metadata(), last_activity_ts)) + Some(( + layer.layer_desc().clone(), + layer.metadata(), + last_activity_ts, + )) } LayerVisibilityHint::Covered => { // Layer is resident but unlikely to be read: not elegible for inclusion in heatmap. @@ -3482,7 +3534,18 @@ impl Timeline { } }); - let mut layers = resident.collect::>(); + let mut layers = match visible_non_resident { + Some(non_resident) => { + let mut non_resident = non_resident.peekable(); + if non_resident.peek().is_none() { + self.previous_heatmap + .store(Some(PreviousHeatmap::Obsolete.into())); + } + + non_resident.chain(resident).collect::>() + } + None => resident.collect::>(), + }; // Sort layers in order of which to download first. For a large set of layers to download, we // want to prioritize those layers which are most likely to still be in the resident many minutes @@ -6661,18 +6724,32 @@ fn is_send() { #[cfg(test)] mod tests { + use std::sync::Arc; + use pageserver_api::key::Key; use pageserver_api::value::Value; + use tracing::Instrument; use utils::{id::TimelineId, lsn::Lsn}; use crate::tenant::{ harness::{test_img, TenantHarness}, layer_map::LayerMap, - storage_layer::{Layer, LayerName}, + storage_layer::{Layer, LayerName, LayerVisibilityHint}, timeline::{DeltaLayerTestDesc, EvictionError}, - Timeline, + PreviousHeatmap, Timeline, }; + use super::HeatMapTimeline; + + fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) { + assert_eq!(lhs.layers.len(), rhs.layers.len()); + let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter()); + for (l, r) in lhs_rhs { + assert_eq!(l.name, r.name); + assert_eq!(l.metadata, r.metadata); + } + } + #[tokio::test] async fn test_heatmap_generation() { let harness = TenantHarness::create("heatmap_generation").await.unwrap(); @@ -6746,7 +6823,7 @@ mod tests { assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name()); let mut last_lsn = Lsn::MAX; - for layer in heatmap.layers { + for layer in &heatmap.layers { // Covered layer should be omitted assert!(layer.name != covered_delta.layer_name()); @@ -6761,6 +6838,144 @@ mod tests { last_lsn = layer_lsn; } } + + // Evict all the layers and stash the old heatmap in the timeline. + // This simulates a migration to a cold secondary location. + + let guard = timeline.layers.read().await; + let mut all_layers = Vec::new(); + let forever = std::time::Duration::from_secs(120); + for layer in guard.likely_resident_layers() { + all_layers.push(layer.clone()); + layer.evict_and_wait(forever).await.unwrap(); + } + drop(guard); + + timeline + .previous_heatmap + .store(Some(Arc::new(PreviousHeatmap::Active { + heatmap: heatmap.clone(), + read_at: std::time::Instant::now(), + }))); + + // Generate a new heatmap and assert that it contains the same layers as the old one. + let post_migration_heatmap = timeline.generate_heatmap().await.unwrap(); + assert_heatmaps_have_same_layers(&heatmap, &post_migration_heatmap); + + // Download each layer one by one. Generate the heatmap at each step and check + // that it's stable. + for layer in all_layers { + if layer.visibility() == LayerVisibilityHint::Covered { + continue; + } + + eprintln!("Downloading {layer} and re-generating heatmap"); + + let _resident = layer + .download_and_keep_resident() + .instrument(tracing::info_span!( + parent: None, + "download_layer", + tenant_id = %timeline.tenant_shard_id.tenant_id, + shard_id = %timeline.tenant_shard_id.shard_slug(), + timeline_id = %timeline.timeline_id + )) + .await + .unwrap(); + + let post_download_heatmap = timeline.generate_heatmap().await.unwrap(); + assert_heatmaps_have_same_layers(&heatmap, &post_download_heatmap); + } + + // Everything from the post-migration heatmap is now resident. + // Check that we drop it from memory. + assert!(matches!( + timeline.previous_heatmap.load().as_deref(), + Some(PreviousHeatmap::Obsolete) + )); + } + + #[tokio::test] + async fn test_previous_heatmap_obsoletion() { + let harness = TenantHarness::create("heatmap_previous_heatmap_obsoletion") + .await + .unwrap(); + + let l0_delta = DeltaLayerTestDesc::new( + Lsn(0x20)..Lsn(0x30), + Key::from_hex("000000000000000000000000000000000000").unwrap() + ..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(), + vec![( + Key::from_hex("720000000033333333444444445500000000").unwrap(), + Lsn(0x25), + Value::Image(test_img("foo")), + )], + ); + + let image_layer = ( + Lsn(0x40), + vec![( + Key::from_hex("620000000033333333444444445500000000").unwrap(), + test_img("bar"), + )], + ); + + let delta_layers = vec![l0_delta]; + let image_layers = vec![image_layer]; + + let (tenant, ctx) = harness.load().await; + let timeline = tenant + .create_test_timeline_with_layers( + TimelineId::generate(), + Lsn(0x10), + 14, + &ctx, + delta_layers, + image_layers, + Lsn(0x100), + ) + .await + .unwrap(); + + // Layer visibility is an input to heatmap generation, so refresh it first + timeline.update_layer_visibility().await.unwrap(); + + let heatmap = timeline + .generate_heatmap() + .await + .expect("Infallible while timeline is not shut down"); + + // Both layers should be in the heatmap + assert!(!heatmap.layers.is_empty()); + + // Now simulate a migration. + timeline + .previous_heatmap + .store(Some(Arc::new(PreviousHeatmap::Active { + heatmap: heatmap.clone(), + read_at: std::time::Instant::now(), + }))); + + // Evict all the layers in the previous heatmap + let guard = timeline.layers.read().await; + let forever = std::time::Duration::from_secs(120); + for layer in guard.likely_resident_layers() { + layer.evict_and_wait(forever).await.unwrap(); + } + drop(guard); + + // Generate a new heatmap and check that the previous heatmap + // has been marked obsolete. + let post_eviction_heatmap = timeline + .generate_heatmap() + .await + .expect("Infallible while timeline is not shut down"); + + assert!(post_eviction_heatmap.layers.is_empty()); + assert!(matches!( + timeline.previous_heatmap.load().as_deref(), + Some(PreviousHeatmap::Obsolete) + )); } #[tokio::test] diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 93b7efedb8..841b2fa1c7 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -294,6 +294,7 @@ impl DeleteTimelineFlow { timeline_id, local_metadata, None, // Ancestor is not needed for deletion. + None, // Previous heatmap is not needed for deletion tenant.get_timeline_resources_for(remote_client), // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 58bc0ba1cd..8c7e9b1726 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -1,7 +1,7 @@ use crate::pageserver_client::PageserverClient; use crate::persistence::Persistence; use crate::{compute_hook, service}; -use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy}; +use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy}; use pageserver_api::models::{ LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest, }; @@ -162,6 +162,22 @@ impl ReconcilerConfig { } } +impl From<&MigrationConfig> for ReconcilerConfig { + fn from(value: &MigrationConfig) -> Self { + let mut builder = ReconcilerConfigBuilder::new(); + + if let Some(timeout) = value.secondary_warmup_timeout { + builder = builder.secondary_warmup_timeout(timeout) + } + + if let Some(timeout) = value.secondary_download_request_timeout { + builder = builder.secondary_download_request_timeout(timeout) + } + + builder.build() + } +} + /// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O pub(crate) struct ReconcileUnits { _sem_units: tokio::sync::OwnedSemaphorePermit, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index b9db46fe4a..c1da9374e4 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -5213,7 +5213,12 @@ impl Service { shard.sequence = shard.sequence.next(); } - self.maybe_reconcile_shard(shard, nodes) + let reconciler_config = match migrate_req.migration_config { + Some(cfg) => (&cfg).into(), + None => ReconcilerConfig::default(), + }; + + self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config) }; if let Some(waiter) = waiter { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 2fa82754ef..b7afbec403 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3,6 +3,7 @@ from __future__ import annotations import abc import asyncio import concurrent.futures +import dataclasses import filecmp import json import os @@ -1675,6 +1676,12 @@ class StorageControllerLeadershipStatus(StrEnum): CANDIDATE = "candidate" +@dataclass +class StorageControllerMigrationConfig: + secondary_warmup_timeout: str | None + secondary_download_request_timeout: str | None + + class NeonStorageController(MetricsGetter, LogUtils): def __init__(self, env: NeonEnv, port: int, auth_enabled: bool): self.env = env @@ -2068,11 +2075,20 @@ class NeonStorageController(MetricsGetter, LogUtils): shards: list[TenantShardId] = body["new_shards"] return shards - def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int): + def tenant_shard_migrate( + self, + tenant_shard_id: TenantShardId, + dest_ps_id: int, + config: StorageControllerMigrationConfig | None = None, + ): + payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id} + if config is not None: + payload["migration_config"] = dataclasses.asdict(config) + self.request( "PUT", f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate", - json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}, + json=payload, headers=self.headers(TokenScope.ADMIN), ) log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}") diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index 590093d23c..8a91a255d8 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -10,14 +10,18 @@ from typing import TYPE_CHECKING import pytest from fixtures.common_types import TenantId, TenantShardId, TimelineId from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + NeonPageserver, + StorageControllerMigrationConfig, +) from fixtures.pageserver.common_types import parse_layer_file_name from fixtures.pageserver.utils import ( assert_prefix_empty, wait_for_upload_queue_empty, ) from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage -from fixtures.utils import skip_in_debug_build, wait_until +from fixtures.utils import run_only_on_default_postgres, skip_in_debug_build, wait_until from fixtures.workload import Workload from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response @@ -889,3 +893,93 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll assert progress_3["heatmap_mtime"] is not None assert progress_3["layers_total"] == progress_3["layers_downloaded"] assert progress_3["bytes_total"] == progress_3["bytes_downloaded"] + + +@skip_in_debug_build("only run with release build") +@run_only_on_default_postgres("PG version is not interesting here") +def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + + env = neon_env_builder.init_configs() + env.start() + + assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + env.create_tenant(tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Attached":1}') + + env.storage_controller.reconcile_until_idle() + + attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"] + ps_attached = env.get_pageserver(attached_to_id) + ps_secondary = next(p for p in env.pageservers if p != ps_attached) + + # Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis) + workload = Workload(env, tenant_id, timeline_id) + workload.init() + workload.write_rows(128, upload=True) + workload.write_rows(128, upload=True) + workload.write_rows(128, upload=True) + workload.write_rows(128, upload=True) + workload.stop() + + # Expect lots of layers + assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10 + + # Simulate large data by making layer downloads artifically slow + for ps in env.pageservers: + ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")]) + + # Upload a heatmap, so that secondaries have something to download + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id) + + # This has no chance to succeed: we have lots of layers and each one takes at least 1000ms. + # However, it pulls the heatmap, which will be important later. + http_client = env.storage_controller.pageserver_api() + (status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000) + assert status == 202 + assert progress["heatmap_mtime"] is not None + assert progress["layers_downloaded"] > 0 + assert progress["bytes_downloaded"] > 0 + assert progress["layers_total"] > progress["layers_downloaded"] + assert progress["bytes_total"] > progress["bytes_downloaded"] + + env.storage_controller.allowed_errors.extend( + [ + ".*Timed out.*downloading layers.*", + ] + ) + + # Use a custom configuration that gives up earlier than usual. + # We can't hydrate everything anyway because of the failpoints. + config = StorageControllerMigrationConfig( + secondary_warmup_timeout="5s", secondary_download_request_timeout="2s" + ) + env.storage_controller.tenant_shard_migrate( + TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config + ) + + env.storage_controller.reconcile_until_idle() + assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id + + ps_secondary.http_client().tenant_heatmap_upload(tenant_id) + heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id) + + assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0 + + # The new layer map should contain all the layers in the pre-migration one + # and a new in memory layer + assert len(heatmap_before_migration["timelines"][0]["layers"]) + 1 == len( + heatmap_after_migration["timelines"][0]["layers"] + ) + + log.info( + f'Heatmap size after cold migration is {len(heatmap_after_migration["timelines"][0]["layers"])}' + ) + + # TODO: Once we have an endpoint for rescuing the cold location, exercise it here.