From c1e4befd561c594bd64818508128203684b54423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 17 Apr 2025 22:25:30 +0200 Subject: [PATCH] Additional fixes and improvements to storcon safekeeper timelines (#11477) This delivers some additional fixes and improvements to storcon managed safekeeper timelines: * use `i32::MAX` for the generation number of timeline deletion * start the generation for new timelines at 1 instead of 0: this ensures that the other components actually are generation enabled * fix database operations we use for metrics * use join in list_pending_ops to prevent the classical ORM issue where one does many db queries * use enums in `test_storcon_create_delete_sk_down`. we are adding a second parameter, and having two bool parameters is weird. * extend `test_storcon_create_delete_sk_down` with a test of whole tenant deletion. this hasn't been tested before. * remove some redundant logging contexts * Don't require mutable access to the service lock for scheduling pending ops in memory. In order to pull this off, create reconcilers eagerly. The advantage is that we don't need mutable access to the service lock that way any more. Part of #9011 --------- Co-authored-by: Arseny Sher --- storage_controller/src/persistence.rs | 43 +++++++--- storage_controller/src/service.rs | 6 +- .../src/service/safekeeper_reconciler.rs | 81 ++++++++++--------- .../src/service/safekeeper_service.rs | 30 ++++--- .../regress/test_storage_controller.py | 48 ++++++++--- 5 files changed, 139 insertions(+), 69 deletions(-) diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index d25448718f..a413bba3c9 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -126,6 +126,7 @@ pub(crate) enum DatabaseOperation { InsertTimelineReconcile, RemoveTimelineReconcile, ListTimelineReconcile, + ListTimelineReconcileStartup, } #[must_use] @@ -1521,23 +1522,41 @@ impl Persistence { .await } - /// Load pending operations from db. - pub(crate) async fn list_pending_ops( + /// Load pending operations from db, joined together with timeline data. + pub(crate) async fn list_pending_ops_with_timelines( &self, - ) -> DatabaseResult> { + ) -> DatabaseResult)>> { use crate::schema::safekeeper_timeline_pending_ops::dsl; + use crate::schema::timelines; let timeline_from_db = self - .with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { - Box::pin(async move { - let from_db: Vec = - dsl::safekeeper_timeline_pending_ops.load(conn).await?; - Ok(from_db) - }) - }) + .with_measured_conn( + DatabaseOperation::ListTimelineReconcileStartup, + move |conn| { + Box::pin(async move { + let from_db: Vec<(TimelinePendingOpPersistence, Option)> = + dsl::safekeeper_timeline_pending_ops + .left_join( + timelines::table.on(timelines::tenant_id + .eq(dsl::tenant_id) + .and(timelines::timeline_id.eq(dsl::timeline_id))), + ) + .select(( + TimelinePendingOpPersistence::as_select(), + Option::::as_select(), + )) + .load(conn) + .await?; + Ok(from_db) + }) + }, + ) .await?; - Ok(timeline_from_db) + Ok(timeline_from_db + .into_iter() + .map(|(op, tl_opt)| (op, tl_opt.map(|tl_opt| tl_opt.into_persistence()))) + .collect()) } /// List pending operations for a given timeline (including tenant-global ones) pub(crate) async fn list_pending_ops_for_timeline( @@ -1580,7 +1599,7 @@ impl Persistence { let tenant_id = &tenant_id; let timeline_id = &timeline_id; - self.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { + self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| { let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default(); Box::pin(async move { diesel::delete(dsl::safekeeper_timeline_pending_ops) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index a021313474..860fc4f6ab 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -824,9 +824,13 @@ impl Service { let mut locked = self.inner.write().unwrap(); locked.become_leader(); + for (sk_id, _sk) in locked.safekeepers.clone().iter() { + locked.safekeeper_reconcilers.start_reconciler(*sk_id, self); + } + locked .safekeeper_reconcilers - .schedule_request_vec(self, sk_schedule_requests); + .schedule_request_vec(sk_schedule_requests); } // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 76e3162617..b15772a36c 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -30,31 +30,35 @@ impl SafekeeperReconcilers { reconcilers: HashMap::new(), } } - pub(crate) fn schedule_request_vec( - &mut self, - service: &Arc, - reqs: Vec, - ) { + /// Adds a safekeeper-specific reconciler. + /// Can be called multiple times, but it needs to be called at least once + /// for every new safekeeper added. + pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc) { + self.reconcilers.entry(node_id).or_insert_with(|| { + SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone()) + }); + } + /// Stop a safekeeper-specific reconciler. + /// Stops the reconciler, cancelling all ongoing tasks. + pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) { + if let Some(handle) = self.reconcilers.remove(&node_id) { + handle.cancel.cancel(); + } + } + pub(crate) fn schedule_request_vec(&self, reqs: Vec) { tracing::info!( "Scheduling {} pending safekeeper ops loaded from db", reqs.len() ); for req in reqs { - self.schedule_request(service, req); + self.schedule_request(req); } } - pub(crate) fn schedule_request(&mut self, service: &Arc, req: ScheduleRequest) { + pub(crate) fn schedule_request(&self, req: ScheduleRequest) { let node_id = req.safekeeper.get_id(); - let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| { - SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone()) - }); + let reconciler_handle = self.reconcilers.get(&node_id).unwrap(); reconciler_handle.schedule_reconcile(req); } - pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) { - if let Some(handle) = self.reconcilers.remove(&node_id) { - handle.cancel.cancel(); - } - } /// Cancel ongoing reconciles for the given timeline /// /// Specifying `None` here only removes reconciles for the tenant-global reconciliation, @@ -78,9 +82,12 @@ pub(crate) async fn load_schedule_requests( service: &Arc, safekeepers: &HashMap, ) -> anyhow::Result> { - let pending_ops = service.persistence.list_pending_ops().await?; - let mut res = Vec::with_capacity(pending_ops.len()); - for op_persist in pending_ops { + let pending_ops_timelines = service + .persistence + .list_pending_ops_with_timelines() + .await?; + let mut res = Vec::with_capacity(pending_ops_timelines.len()); + for (op_persist, timeline_persist) in pending_ops_timelines { let node_id = NodeId(op_persist.sk_id as u64); let Some(sk) = safekeepers.get(&node_id) else { // This shouldn't happen, at least the safekeeper should exist as decomissioned. @@ -102,16 +109,12 @@ pub(crate) async fn load_schedule_requests( SafekeeperTimelineOpKind::Delete => Vec::new(), SafekeeperTimelineOpKind::Exclude => Vec::new(), SafekeeperTimelineOpKind::Pull => { - // TODO this code is super hacky, it doesn't take migrations into account - let Some(timeline_id) = timeline_id else { + if timeline_id.is_none() { + // We only do this extra check (outside of timeline_persist check) to give better error msgs anyhow::bail!( "timeline_id is empty for `pull` schedule request for {tenant_id}" ); }; - let timeline_persist = service - .persistence - .get_timeline(tenant_id, timeline_id) - .await?; let Some(timeline_persist) = timeline_persist else { // This shouldn't happen, the timeline should still exist tracing::warn!( @@ -163,6 +166,7 @@ pub(crate) struct ScheduleRequest { pub(crate) kind: SafekeeperTimelineOpKind, } +/// Handle to per safekeeper reconciler. struct ReconcilerHandle { tx: UnboundedSender<(ScheduleRequest, CancellationToken)>, ongoing_tokens: Arc), CancellationToken>>, @@ -170,7 +174,10 @@ struct ReconcilerHandle { } impl ReconcilerHandle { - /// Obtain a new token slot, cancelling any existing reconciliations for that timeline + /// Obtain a new token slot, cancelling any existing reconciliations for + /// that timeline. It is not useful to have >1 operation per , hence scheduling op cancels current one if it + /// exists. fn new_token_slot( &self, tenant_id: TenantId, @@ -305,15 +312,16 @@ impl SafekeeperReconciler { SafekeeperTimelineOpKind::Delete => { let tenant_id = req.tenant_id; if let Some(timeline_id) = req.timeline_id { - let deleted = self.reconcile_inner( - req, - async |client| client.delete_timeline(tenant_id, timeline_id).await, - |_resp| { - tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}"); - }, - req_cancel, - ) - .await; + let deleted = self + .reconcile_inner( + req, + async |client| client.delete_timeline(tenant_id, timeline_id).await, + |_resp| { + tracing::info!("deleted timeline from {req_host}"); + }, + req_cancel, + ) + .await; if deleted { self.delete_timeline_from_db(tenant_id, timeline_id).await; } @@ -344,12 +352,13 @@ impl SafekeeperReconciler { { Ok(list) => { if !list.is_empty() { - tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len()); + // duplicate the timeline_id here because it might be None in the reconcile context + tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len()); return; } } Err(e) => { - tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}"); + tracing::warn!(%timeline_id, "couldn't query pending ops: {e}"); return; } } diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index a23b9a4a02..8a13c6af23 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -46,6 +46,7 @@ impl Service { .map(SecretString::from); let mut joinset = JoinSet::new(); + // Prepare membership::Configuration from choosen safekeepers. let safekeepers = { let locked = self.inner.read().unwrap(); locked.safekeepers.clone() @@ -205,7 +206,7 @@ impl Service { tenant_id: tenant_id.to_string(), timeline_id: timeline_id.to_string(), start_lsn: start_lsn.into(), - generation: 0, + generation: 1, sk_set: sks_persistence.clone(), new_sk_set: None, cplane_notified_generation: 0, @@ -254,7 +255,7 @@ impl Service { self.persistence.insert_pending_op(pending_op).await?; } if !remaining.is_empty() { - let mut locked = self.inner.write().unwrap(); + let locked = self.inner.read().unwrap(); for remaining_id in remaining { let Some(sk) = locked.safekeepers.get(&remaining_id) else { return Err(ApiError::InternalServerError(anyhow::anyhow!( @@ -290,7 +291,7 @@ impl Service { generation: timeline_persist.generation as u32, kind: crate::persistence::SafekeeperTimelineOpKind::Pull, }; - locked.safekeeper_reconcilers.schedule_request(self, req); + locked.safekeeper_reconcilers.schedule_request(req); } } @@ -357,7 +358,7 @@ impl Service { let pending_op = TimelinePendingOpPersistence { tenant_id: tenant_id.to_string(), timeline_id: timeline_id.to_string(), - generation: tl.generation, + generation: i32::MAX, op_kind: SafekeeperTimelineOpKind::Delete, sk_id: *sk_id, }; @@ -365,7 +366,7 @@ impl Service { self.persistence.insert_pending_op(pending_op).await?; } { - let mut locked = self.inner.write().unwrap(); + let locked = self.inner.read().unwrap(); for sk_id in all_sks { let sk_id = NodeId(*sk_id as u64); let Some(sk) = locked.safekeepers.get(&sk_id) else { @@ -383,7 +384,7 @@ impl Service { generation: tl.generation as u32, kind: SafekeeperTimelineOpKind::Delete, }; - locked.safekeeper_reconcilers.schedule_request(self, req); + locked.safekeeper_reconcilers.schedule_request(req); } } Ok(()) @@ -482,7 +483,7 @@ impl Service { tenant_id, timeline_id: None, }; - locked.safekeeper_reconcilers.schedule_request(self, req); + locked.safekeeper_reconcilers.schedule_request(req); } Ok(()) } @@ -579,7 +580,7 @@ impl Service { } pub(crate) async fn upsert_safekeeper( - &self, + self: &Arc, record: crate::persistence::SafekeeperUpsert, ) -> Result<(), ApiError> { let node_id = NodeId(record.id as u64); @@ -618,6 +619,9 @@ impl Service { ); } } + locked + .safekeeper_reconcilers + .start_reconciler(node_id, self); locked.safekeepers = Arc::new(safekeepers); metrics::METRICS_REGISTRY .metrics_group @@ -638,7 +642,7 @@ impl Service { } pub(crate) async fn set_safekeeper_scheduling_policy( - &self, + self: &Arc, id: i64, scheduling_policy: SkSchedulingPolicy, ) -> Result<(), DatabaseError> { @@ -656,9 +660,13 @@ impl Service { sk.set_scheduling_policy(scheduling_policy); match scheduling_policy { - SkSchedulingPolicy::Active => (), + SkSchedulingPolicy::Active => { + locked + .safekeeper_reconcilers + .start_reconciler(node_id, self); + } SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => { - locked.safekeeper_reconcilers.cancel_safekeeper(node_id); + locked.safekeeper_reconcilers.stop_reconciler(node_id); } } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index b2c8415e9a..26f745adb9 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4114,13 +4114,29 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB assert reconciles_after_restart == 0 +class RestartStorcon(Enum): + RESTART = "restart" + ONLINE = "online" + + +class DeletionSubject(Enum): + TIMELINE = "timeline" + TENANT = "tenant" + + @run_only_on_default_postgres("PG version is not interesting here") -@pytest.mark.parametrize("restart_storcon", [True, False]) -def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool): +@pytest.mark.parametrize("restart_storcon", [RestartStorcon.RESTART, RestartStorcon.ONLINE]) +@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE]) +def test_storcon_create_delete_sk_down( + neon_env_builder: NeonEnvBuilder, + restart_storcon: RestartStorcon, + deletetion_subject: DeletionSubject, +): """ Test that the storcon can create and delete tenants and timelines with a safekeeper being down. - - restart_storcon: tests whether the pending ops are persisted. + - restart_storcon: tests that the pending ops are persisted. if we don't restart, we test that we don't require it to come from the db. + - deletion_subject: test that both single timeline and whole tenant deletion work. """ neon_env_builder.num_safekeepers = 3 @@ -4143,6 +4159,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.create_tenant(tenant_id, timeline_id) + child_timeline_id = env.create_branch("child_of_main", tenant_id) env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") @@ -4155,7 +4172,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart ] ) - if restart_storcon: + if restart_storcon == RestartStorcon.RESTART: # Restart the storcon to check that we persist operations env.storage_controller.stop() env.storage_controller.start() @@ -4168,6 +4185,13 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + with env.endpoints.create( + "child_of_main", tenant_id=tenant_id, config_lines=config_lines + ) as ep: + # endpoint should start. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + env.storage_controller.assert_log_contains("writing pending op for sk id 1") env.safekeepers[0].start() @@ -4176,25 +4200,31 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart env.safekeepers[0].assert_log_contains( f"pulling timeline {tenant_id}/{timeline_id} from safekeeper" ) + env.safekeepers[0].assert_log_contains( + f"pulling timeline {tenant_id}/{child_timeline_id} from safekeeper" + ) wait_until(logged_contains_on_sk) env.safekeepers[1].stop() - env.storage_controller.pageserver_api().tenant_delete(tenant_id) + if deletetion_subject == DeletionSubject.TENANT: + env.storage_controller.pageserver_api().tenant_delete(tenant_id) + else: + env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id) # ensure the safekeeper deleted the timeline def timeline_deleted_on_active_sks(): env.safekeepers[0].assert_log_contains( - f"deleting timeline {tenant_id}/{timeline_id} from disk" + f"deleting timeline {tenant_id}/{child_timeline_id} from disk" ) env.safekeepers[2].assert_log_contains( - f"deleting timeline {tenant_id}/{timeline_id} from disk" + f"deleting timeline {tenant_id}/{child_timeline_id} from disk" ) wait_until(timeline_deleted_on_active_sks) - if restart_storcon: + if restart_storcon == RestartStorcon.RESTART: # Restart the storcon to check that we persist operations env.storage_controller.stop() env.storage_controller.start() @@ -4204,7 +4234,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart # ensure that there is log msgs for the third safekeeper too def timeline_deleted_on_sk(): env.safekeepers[1].assert_log_contains( - f"deleting timeline {tenant_id}/{timeline_id} from disk" + f"deleting timeline {tenant_id}/{child_timeline_id} from disk" ) wait_until(timeline_deleted_on_sk)