diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index d25448718f..a413bba3c9 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -126,6 +126,7 @@ pub(crate) enum DatabaseOperation { InsertTimelineReconcile, RemoveTimelineReconcile, ListTimelineReconcile, + ListTimelineReconcileStartup, } #[must_use] @@ -1521,23 +1522,41 @@ impl Persistence { .await } - /// Load pending operations from db. - pub(crate) async fn list_pending_ops( + /// Load pending operations from db, joined together with timeline data. + pub(crate) async fn list_pending_ops_with_timelines( &self, - ) -> DatabaseResult> { + ) -> DatabaseResult)>> { use crate::schema::safekeeper_timeline_pending_ops::dsl; + use crate::schema::timelines; let timeline_from_db = self - .with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { - Box::pin(async move { - let from_db: Vec = - dsl::safekeeper_timeline_pending_ops.load(conn).await?; - Ok(from_db) - }) - }) + .with_measured_conn( + DatabaseOperation::ListTimelineReconcileStartup, + move |conn| { + Box::pin(async move { + let from_db: Vec<(TimelinePendingOpPersistence, Option)> = + dsl::safekeeper_timeline_pending_ops + .left_join( + timelines::table.on(timelines::tenant_id + .eq(dsl::tenant_id) + .and(timelines::timeline_id.eq(dsl::timeline_id))), + ) + .select(( + TimelinePendingOpPersistence::as_select(), + Option::::as_select(), + )) + .load(conn) + .await?; + Ok(from_db) + }) + }, + ) .await?; - Ok(timeline_from_db) + Ok(timeline_from_db + .into_iter() + .map(|(op, tl_opt)| (op, tl_opt.map(|tl_opt| tl_opt.into_persistence()))) + .collect()) } /// List pending operations for a given timeline (including tenant-global ones) pub(crate) async fn list_pending_ops_for_timeline( @@ -1580,7 +1599,7 @@ impl Persistence { let tenant_id = &tenant_id; let timeline_id = &timeline_id; - self.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { + self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| { let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default(); Box::pin(async move { diesel::delete(dsl::safekeeper_timeline_pending_ops) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index a021313474..860fc4f6ab 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -824,9 +824,13 @@ impl Service { let mut locked = self.inner.write().unwrap(); locked.become_leader(); + for (sk_id, _sk) in locked.safekeepers.clone().iter() { + locked.safekeeper_reconcilers.start_reconciler(*sk_id, self); + } + locked .safekeeper_reconcilers - .schedule_request_vec(self, sk_schedule_requests); + .schedule_request_vec(sk_schedule_requests); } // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 76e3162617..b15772a36c 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -30,31 +30,35 @@ impl SafekeeperReconcilers { reconcilers: HashMap::new(), } } - pub(crate) fn schedule_request_vec( - &mut self, - service: &Arc, - reqs: Vec, - ) { + /// Adds a safekeeper-specific reconciler. + /// Can be called multiple times, but it needs to be called at least once + /// for every new safekeeper added. + pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc) { + self.reconcilers.entry(node_id).or_insert_with(|| { + SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone()) + }); + } + /// Stop a safekeeper-specific reconciler. + /// Stops the reconciler, cancelling all ongoing tasks. + pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) { + if let Some(handle) = self.reconcilers.remove(&node_id) { + handle.cancel.cancel(); + } + } + pub(crate) fn schedule_request_vec(&self, reqs: Vec) { tracing::info!( "Scheduling {} pending safekeeper ops loaded from db", reqs.len() ); for req in reqs { - self.schedule_request(service, req); + self.schedule_request(req); } } - pub(crate) fn schedule_request(&mut self, service: &Arc, req: ScheduleRequest) { + pub(crate) fn schedule_request(&self, req: ScheduleRequest) { let node_id = req.safekeeper.get_id(); - let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| { - SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone()) - }); + let reconciler_handle = self.reconcilers.get(&node_id).unwrap(); reconciler_handle.schedule_reconcile(req); } - pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) { - if let Some(handle) = self.reconcilers.remove(&node_id) { - handle.cancel.cancel(); - } - } /// Cancel ongoing reconciles for the given timeline /// /// Specifying `None` here only removes reconciles for the tenant-global reconciliation, @@ -78,9 +82,12 @@ pub(crate) async fn load_schedule_requests( service: &Arc, safekeepers: &HashMap, ) -> anyhow::Result> { - let pending_ops = service.persistence.list_pending_ops().await?; - let mut res = Vec::with_capacity(pending_ops.len()); - for op_persist in pending_ops { + let pending_ops_timelines = service + .persistence + .list_pending_ops_with_timelines() + .await?; + let mut res = Vec::with_capacity(pending_ops_timelines.len()); + for (op_persist, timeline_persist) in pending_ops_timelines { let node_id = NodeId(op_persist.sk_id as u64); let Some(sk) = safekeepers.get(&node_id) else { // This shouldn't happen, at least the safekeeper should exist as decomissioned. @@ -102,16 +109,12 @@ pub(crate) async fn load_schedule_requests( SafekeeperTimelineOpKind::Delete => Vec::new(), SafekeeperTimelineOpKind::Exclude => Vec::new(), SafekeeperTimelineOpKind::Pull => { - // TODO this code is super hacky, it doesn't take migrations into account - let Some(timeline_id) = timeline_id else { + if timeline_id.is_none() { + // We only do this extra check (outside of timeline_persist check) to give better error msgs anyhow::bail!( "timeline_id is empty for `pull` schedule request for {tenant_id}" ); }; - let timeline_persist = service - .persistence - .get_timeline(tenant_id, timeline_id) - .await?; let Some(timeline_persist) = timeline_persist else { // This shouldn't happen, the timeline should still exist tracing::warn!( @@ -163,6 +166,7 @@ pub(crate) struct ScheduleRequest { pub(crate) kind: SafekeeperTimelineOpKind, } +/// Handle to per safekeeper reconciler. struct ReconcilerHandle { tx: UnboundedSender<(ScheduleRequest, CancellationToken)>, ongoing_tokens: Arc), CancellationToken>>, @@ -170,7 +174,10 @@ struct ReconcilerHandle { } impl ReconcilerHandle { - /// Obtain a new token slot, cancelling any existing reconciliations for that timeline + /// Obtain a new token slot, cancelling any existing reconciliations for + /// that timeline. It is not useful to have >1 operation per , hence scheduling op cancels current one if it + /// exists. fn new_token_slot( &self, tenant_id: TenantId, @@ -305,15 +312,16 @@ impl SafekeeperReconciler { SafekeeperTimelineOpKind::Delete => { let tenant_id = req.tenant_id; if let Some(timeline_id) = req.timeline_id { - let deleted = self.reconcile_inner( - req, - async |client| client.delete_timeline(tenant_id, timeline_id).await, - |_resp| { - tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}"); - }, - req_cancel, - ) - .await; + let deleted = self + .reconcile_inner( + req, + async |client| client.delete_timeline(tenant_id, timeline_id).await, + |_resp| { + tracing::info!("deleted timeline from {req_host}"); + }, + req_cancel, + ) + .await; if deleted { self.delete_timeline_from_db(tenant_id, timeline_id).await; } @@ -344,12 +352,13 @@ impl SafekeeperReconciler { { Ok(list) => { if !list.is_empty() { - tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len()); + // duplicate the timeline_id here because it might be None in the reconcile context + tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len()); return; } } Err(e) => { - tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}"); + tracing::warn!(%timeline_id, "couldn't query pending ops: {e}"); return; } } diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index a23b9a4a02..8a13c6af23 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -46,6 +46,7 @@ impl Service { .map(SecretString::from); let mut joinset = JoinSet::new(); + // Prepare membership::Configuration from choosen safekeepers. let safekeepers = { let locked = self.inner.read().unwrap(); locked.safekeepers.clone() @@ -205,7 +206,7 @@ impl Service { tenant_id: tenant_id.to_string(), timeline_id: timeline_id.to_string(), start_lsn: start_lsn.into(), - generation: 0, + generation: 1, sk_set: sks_persistence.clone(), new_sk_set: None, cplane_notified_generation: 0, @@ -254,7 +255,7 @@ impl Service { self.persistence.insert_pending_op(pending_op).await?; } if !remaining.is_empty() { - let mut locked = self.inner.write().unwrap(); + let locked = self.inner.read().unwrap(); for remaining_id in remaining { let Some(sk) = locked.safekeepers.get(&remaining_id) else { return Err(ApiError::InternalServerError(anyhow::anyhow!( @@ -290,7 +291,7 @@ impl Service { generation: timeline_persist.generation as u32, kind: crate::persistence::SafekeeperTimelineOpKind::Pull, }; - locked.safekeeper_reconcilers.schedule_request(self, req); + locked.safekeeper_reconcilers.schedule_request(req); } } @@ -357,7 +358,7 @@ impl Service { let pending_op = TimelinePendingOpPersistence { tenant_id: tenant_id.to_string(), timeline_id: timeline_id.to_string(), - generation: tl.generation, + generation: i32::MAX, op_kind: SafekeeperTimelineOpKind::Delete, sk_id: *sk_id, }; @@ -365,7 +366,7 @@ impl Service { self.persistence.insert_pending_op(pending_op).await?; } { - let mut locked = self.inner.write().unwrap(); + let locked = self.inner.read().unwrap(); for sk_id in all_sks { let sk_id = NodeId(*sk_id as u64); let Some(sk) = locked.safekeepers.get(&sk_id) else { @@ -383,7 +384,7 @@ impl Service { generation: tl.generation as u32, kind: SafekeeperTimelineOpKind::Delete, }; - locked.safekeeper_reconcilers.schedule_request(self, req); + locked.safekeeper_reconcilers.schedule_request(req); } } Ok(()) @@ -482,7 +483,7 @@ impl Service { tenant_id, timeline_id: None, }; - locked.safekeeper_reconcilers.schedule_request(self, req); + locked.safekeeper_reconcilers.schedule_request(req); } Ok(()) } @@ -579,7 +580,7 @@ impl Service { } pub(crate) async fn upsert_safekeeper( - &self, + self: &Arc, record: crate::persistence::SafekeeperUpsert, ) -> Result<(), ApiError> { let node_id = NodeId(record.id as u64); @@ -618,6 +619,9 @@ impl Service { ); } } + locked + .safekeeper_reconcilers + .start_reconciler(node_id, self); locked.safekeepers = Arc::new(safekeepers); metrics::METRICS_REGISTRY .metrics_group @@ -638,7 +642,7 @@ impl Service { } pub(crate) async fn set_safekeeper_scheduling_policy( - &self, + self: &Arc, id: i64, scheduling_policy: SkSchedulingPolicy, ) -> Result<(), DatabaseError> { @@ -656,9 +660,13 @@ impl Service { sk.set_scheduling_policy(scheduling_policy); match scheduling_policy { - SkSchedulingPolicy::Active => (), + SkSchedulingPolicy::Active => { + locked + .safekeeper_reconcilers + .start_reconciler(node_id, self); + } SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => { - locked.safekeeper_reconcilers.cancel_safekeeper(node_id); + locked.safekeeper_reconcilers.stop_reconciler(node_id); } } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index b2c8415e9a..26f745adb9 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4114,13 +4114,29 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB assert reconciles_after_restart == 0 +class RestartStorcon(Enum): + RESTART = "restart" + ONLINE = "online" + + +class DeletionSubject(Enum): + TIMELINE = "timeline" + TENANT = "tenant" + + @run_only_on_default_postgres("PG version is not interesting here") -@pytest.mark.parametrize("restart_storcon", [True, False]) -def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool): +@pytest.mark.parametrize("restart_storcon", [RestartStorcon.RESTART, RestartStorcon.ONLINE]) +@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE]) +def test_storcon_create_delete_sk_down( + neon_env_builder: NeonEnvBuilder, + restart_storcon: RestartStorcon, + deletetion_subject: DeletionSubject, +): """ Test that the storcon can create and delete tenants and timelines with a safekeeper being down. - - restart_storcon: tests whether the pending ops are persisted. + - restart_storcon: tests that the pending ops are persisted. if we don't restart, we test that we don't require it to come from the db. + - deletion_subject: test that both single timeline and whole tenant deletion work. """ neon_env_builder.num_safekeepers = 3 @@ -4143,6 +4159,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.create_tenant(tenant_id, timeline_id) + child_timeline_id = env.create_branch("child_of_main", tenant_id) env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") @@ -4155,7 +4172,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart ] ) - if restart_storcon: + if restart_storcon == RestartStorcon.RESTART: # Restart the storcon to check that we persist operations env.storage_controller.stop() env.storage_controller.start() @@ -4168,6 +4185,13 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + with env.endpoints.create( + "child_of_main", tenant_id=tenant_id, config_lines=config_lines + ) as ep: + # endpoint should start. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + env.storage_controller.assert_log_contains("writing pending op for sk id 1") env.safekeepers[0].start() @@ -4176,25 +4200,31 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart env.safekeepers[0].assert_log_contains( f"pulling timeline {tenant_id}/{timeline_id} from safekeeper" ) + env.safekeepers[0].assert_log_contains( + f"pulling timeline {tenant_id}/{child_timeline_id} from safekeeper" + ) wait_until(logged_contains_on_sk) env.safekeepers[1].stop() - env.storage_controller.pageserver_api().tenant_delete(tenant_id) + if deletetion_subject == DeletionSubject.TENANT: + env.storage_controller.pageserver_api().tenant_delete(tenant_id) + else: + env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id) # ensure the safekeeper deleted the timeline def timeline_deleted_on_active_sks(): env.safekeepers[0].assert_log_contains( - f"deleting timeline {tenant_id}/{timeline_id} from disk" + f"deleting timeline {tenant_id}/{child_timeline_id} from disk" ) env.safekeepers[2].assert_log_contains( - f"deleting timeline {tenant_id}/{timeline_id} from disk" + f"deleting timeline {tenant_id}/{child_timeline_id} from disk" ) wait_until(timeline_deleted_on_active_sks) - if restart_storcon: + if restart_storcon == RestartStorcon.RESTART: # Restart the storcon to check that we persist operations env.storage_controller.stop() env.storage_controller.start() @@ -4204,7 +4234,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart # ensure that there is log msgs for the third safekeeper too def timeline_deleted_on_sk(): env.safekeepers[1].assert_log_contains( - f"deleting timeline {tenant_id}/{timeline_id} from disk" + f"deleting timeline {tenant_id}/{child_timeline_id} from disk" ) wait_until(timeline_deleted_on_sk)