From 133f16e9b5de4743070638ae704501a6b6c6fd4a Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Tue, 22 Jul 2025 18:58:20 +0400 Subject: [PATCH] storcon: finish safekeeper migration gracefully (#12528) ## Problem We don't detect if safekeeper migration fails after the the commiting the membership configuration to the database. As a result, we might leave stale timelines on excluded safekeepers and do not notify cplane/safekepeers about new configuration. - Implements solution proposed in https://github.com/neondatabase/neon/pull/12432 - Closes: https://github.com/neondatabase/neon/issues/12192 - Closes: [LKB-944](https://databricks.atlassian.net/browse/LKB-944) ## Summary of changes - Add `sk_set_notified_generation` column to `timelines` database - Update `*_notified_generation` in database during the finish state. - Commit reconciliation requests to database atomically with membership configuration. - Reload pending ops and retry "finish" step if we detect `*_notified_generation` mismatch. - Add failpoints and test that we handle failures well --- safekeeper/src/timeline.rs | 3 + .../down.sql | 1 + .../up.sql | 1 + storage_controller/src/persistence.rs | 142 +++++++- storage_controller/src/schema.rs | 1 + .../src/service/safekeeper_service.rs | 309 +++++++++++++++--- .../regress/test_safekeeper_migration.py | 109 +++++- 7 files changed, 485 insertions(+), 81 deletions(-) create mode 100644 storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/down.sql create mode 100644 storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/up.sql diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index a1a0aab9fd..b8774b30ea 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -427,6 +427,9 @@ impl From for ApiError { TimelineError::NotFound(ttid) => { ApiError::NotFound(anyhow!("timeline {} not found", ttid).into()) } + TimelineError::Deleted(ttid) => { + ApiError::NotFound(anyhow!("timeline {} deleted", ttid).into()) + } _ => ApiError::InternalServerError(anyhow!("{}", te)), } } diff --git a/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/down.sql b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/down.sql new file mode 100644 index 0000000000..27d6048cd3 --- /dev/null +++ b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/down.sql @@ -0,0 +1 @@ +ALTER TABLE timelines DROP sk_set_notified_generation; diff --git a/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/up.sql b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/up.sql new file mode 100644 index 0000000000..50178ab6a3 --- /dev/null +++ b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/up.sql @@ -0,0 +1 @@ +ALTER TABLE timelines ADD sk_set_notified_generation INTEGER NOT NULL DEFAULT 1; diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 2e3f8c6908..619b5f69b8 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -131,6 +131,8 @@ pub(crate) enum DatabaseOperation { InsertTimeline, UpdateTimeline, UpdateTimelineMembership, + UpdateCplaneNotifiedGeneration, + UpdateSkSetNotifiedGeneration, GetTimeline, InsertTimelineReconcile, RemoveTimelineReconcile, @@ -1497,6 +1499,8 @@ impl Persistence { /// Update timeline membership configuration in the database. /// Perform a compare-and-swap (CAS) operation on the timeline's generation. /// The `new_generation` must be the next (+1) generation after the one in the database. + /// Also inserts reconcile_requests to safekeeper_timeline_pending_ops table in the same + /// transaction. pub(crate) async fn update_timeline_membership( &self, tenant_id: TenantId, @@ -1504,8 +1508,11 @@ impl Persistence { new_generation: SafekeeperGeneration, sk_set: &[NodeId], new_sk_set: Option<&[NodeId]>, + reconcile_requests: &[TimelinePendingOpPersistence], ) -> DatabaseResult<()> { - use crate::schema::timelines::dsl; + use crate::schema::safekeeper_timeline_pending_ops as stpo; + use crate::schema::timelines; + use diesel::query_dsl::methods::FilterDsl; let prev_generation = new_generation.previous().unwrap(); @@ -1513,14 +1520,15 @@ impl Persistence { let timeline_id = &timeline_id; self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| { Box::pin(async move { - let updated = diesel::update(dsl::timelines) - .filter(dsl::tenant_id.eq(&tenant_id.to_string())) - .filter(dsl::timeline_id.eq(&timeline_id.to_string())) - .filter(dsl::generation.eq(prev_generation.into_inner() as i32)) + let updated = diesel::update(timelines::table) + .filter(timelines::tenant_id.eq(&tenant_id.to_string())) + .filter(timelines::timeline_id.eq(&timeline_id.to_string())) + .filter(timelines::generation.eq(prev_generation.into_inner() as i32)) .set(( - dsl::generation.eq(new_generation.into_inner() as i32), - dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::>()), - dsl::new_sk_set.eq(new_sk_set + timelines::generation.eq(new_generation.into_inner() as i32), + timelines::sk_set + .eq(sk_set.iter().map(|id| id.0 as i64).collect::>()), + timelines::new_sk_set.eq(new_sk_set .map(|set| set.iter().map(|id| id.0 as i64).collect::>())), )) .execute(conn) @@ -1530,20 +1538,123 @@ impl Persistence { 0 => { // TODO(diko): It makes sense to select the current generation // and include it in the error message for better debuggability. - Err(DatabaseError::Cas( + return Err(DatabaseError::Cas( "Failed to update membership configuration".to_string(), - )) + )); + } + 1 => {} + _ => { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({updated})" + ))); + } + }; + + for req in reconcile_requests { + let inserted_updated = diesel::insert_into(stpo::table) + .values(req) + .on_conflict((stpo::tenant_id, stpo::timeline_id, stpo::sk_id)) + .do_update() + .set(req) + .filter(stpo::generation.lt(req.generation)) + .execute(conn) + .await?; + + if inserted_updated > 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({inserted_updated})" + ))); } - 1 => Ok(()), - _ => Err(DatabaseError::Logical(format!( - "unexpected number of rows ({updated})" - ))), } + + Ok(()) }) }) .await } + /// Update the cplane notified generation for a timeline. + /// Perform a compare-and-swap (CAS) operation on the timeline's cplane notified generation. + /// The update will fail if the specified generation is less than the cplane notified generation + /// in the database. + pub(crate) async fn update_cplane_notified_generation( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + generation: SafekeeperGeneration, + ) -> DatabaseResult<()> { + use crate::schema::timelines::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn( + DatabaseOperation::UpdateCplaneNotifiedGeneration, + move |conn| { + Box::pin(async move { + let updated = diesel::update(dsl::timelines) + .filter(dsl::tenant_id.eq(&tenant_id.to_string())) + .filter(dsl::timeline_id.eq(&timeline_id.to_string())) + .filter(dsl::cplane_notified_generation.le(generation.into_inner() as i32)) + .set(dsl::cplane_notified_generation.eq(generation.into_inner() as i32)) + .execute(conn) + .await?; + + match updated { + 0 => Err(DatabaseError::Cas( + "Failed to update cplane notified generation".to_string(), + )), + 1 => Ok(()), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({updated})" + ))), + } + }) + }, + ) + .await + } + + /// Update the sk set notified generation for a timeline. + /// Perform a compare-and-swap (CAS) operation on the timeline's sk set notified generation. + /// The update will fail if the specified generation is less than the sk set notified generation + /// in the database. + pub(crate) async fn update_sk_set_notified_generation( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + generation: SafekeeperGeneration, + ) -> DatabaseResult<()> { + use crate::schema::timelines::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn( + DatabaseOperation::UpdateSkSetNotifiedGeneration, + move |conn| { + Box::pin(async move { + let updated = diesel::update(dsl::timelines) + .filter(dsl::tenant_id.eq(&tenant_id.to_string())) + .filter(dsl::timeline_id.eq(&timeline_id.to_string())) + .filter(dsl::sk_set_notified_generation.le(generation.into_inner() as i32)) + .set(dsl::sk_set_notified_generation.eq(generation.into_inner() as i32)) + .execute(conn) + .await?; + + match updated { + 0 => Err(DatabaseError::Cas( + "Failed to update sk set notified generation".to_string(), + )), + 1 => Ok(()), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({updated})" + ))), + } + }) + }, + ) + .await + } + /// Load timeline from db. Returns `None` if not present. pub(crate) async fn get_timeline( &self, @@ -2493,6 +2604,7 @@ pub(crate) struct TimelinePersistence { pub(crate) new_sk_set: Option>, pub(crate) cplane_notified_generation: i32, pub(crate) deleted_at: Option>, + pub(crate) sk_set_notified_generation: i32, } /// This is separate from [TimelinePersistence] only because postgres allows NULLs @@ -2511,6 +2623,7 @@ pub(crate) struct TimelineFromDb { pub(crate) new_sk_set: Option>>, pub(crate) cplane_notified_generation: i32, pub(crate) deleted_at: Option>, + pub(crate) sk_set_notified_generation: i32, } impl TimelineFromDb { @@ -2530,6 +2643,7 @@ impl TimelineFromDb { new_sk_set, cplane_notified_generation: self.cplane_notified_generation, deleted_at: self.deleted_at, + sk_set_notified_generation: self.sk_set_notified_generation, } } } diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index f3dcdaf798..def519c168 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -118,6 +118,7 @@ diesel::table! { new_sk_set -> Nullable>>, cplane_notified_generation -> Int4, deleted_at -> Nullable, + sk_set_notified_generation -> Int4, } } diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 28c70e203a..bc77a1a6b8 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -312,6 +312,7 @@ impl Service { new_sk_set: None, cplane_notified_generation: 0, deleted_at: None, + sk_set_notified_generation: 0, }; let inserted = self .persistence @@ -461,6 +462,7 @@ impl Service { new_sk_set: None, cplane_notified_generation: 1, deleted_at: None, + sk_set_notified_generation: 1, }; let inserted = self .persistence @@ -894,17 +896,21 @@ impl Service { /// If min_position is not None, validates that majority of safekeepers /// reached at least min_position. /// + /// If update_notified_generation is set, also updates sk_set_notified_generation + /// in the timelines table. + /// /// Return responses from safekeepers in the input order. async fn tenant_timeline_set_membership_quorum( self: &Arc, tenant_id: TenantId, timeline_id: TimelineId, safekeepers: &[Safekeeper], - config: &membership::Configuration, + mconf: &membership::Configuration, min_position: Option<(Term, Lsn)>, + update_notified_generation: bool, ) -> Result>, ApiError> { let req = TimelineMembershipSwitchRequest { - mconf: config.clone(), + mconf: mconf.clone(), }; const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); @@ -945,28 +951,34 @@ impl Service { .await?; for res in results.iter().flatten() { - if res.current_conf.generation > config.generation { + if res.current_conf.generation > mconf.generation { // Antoher switch_membership raced us. return Err(ApiError::Conflict(format!( "received configuration with generation {} from safekeeper, but expected {}", - res.current_conf.generation, config.generation + res.current_conf.generation, mconf.generation ))); - } else if res.current_conf.generation < config.generation { + } else if res.current_conf.generation < mconf.generation { // Note: should never happen. // If we get a response, it should be at least the sent generation. tracing::error!( "received configuration with generation {} from safekeeper, but expected {}", res.current_conf.generation, - config.generation + mconf.generation ); return Err(ApiError::InternalServerError(anyhow::anyhow!( "received configuration with generation {} from safekeeper, but expected {}", res.current_conf.generation, - config.generation + mconf.generation ))); } } + if update_notified_generation { + self.persistence + .update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation) + .await?; + } + Ok(results) } @@ -1035,17 +1047,22 @@ impl Service { } /// Exclude a timeline from safekeepers in parallel with retries. - /// If an exclude request is unsuccessful, it will be added to - /// the reconciler, and after that the function will succeed. - async fn tenant_timeline_safekeeper_exclude( + /// + /// Assumes that the exclude requests are already persistent in the database. + /// + /// The function does best effort: if an exclude request is unsuccessful, + /// it will be added to the in-memory reconciler, and the function will succeed anyway. + /// + /// Might fail if there is error accessing the database. + async fn tenant_timeline_safekeeper_exclude_reconcile( self: &Arc, tenant_id: TenantId, timeline_id: TimelineId, safekeepers: &[Safekeeper], - config: &membership::Configuration, + mconf: &membership::Configuration, ) -> Result<(), ApiError> { let req = TimelineMembershipSwitchRequest { - mconf: config.clone(), + mconf: mconf.clone(), }; const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30); @@ -1063,25 +1080,32 @@ impl Service { let mut reconcile_requests = Vec::new(); - for (idx, res) in results.iter().enumerate() { - if res.is_err() { - let sk_id = safekeepers[idx].skp.id; - let pending_op = TimelinePendingOpPersistence { - tenant_id: tenant_id.to_string(), - timeline_id: timeline_id.to_string(), - generation: config.generation.into_inner() as i32, - op_kind: SafekeeperTimelineOpKind::Exclude, - sk_id, - }; - tracing::info!("writing pending exclude op for sk id {sk_id}"); - self.persistence.insert_pending_op(pending_op).await?; + fail::fail_point!("sk-migration-step-9-mid-exclude", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-step-9-mid-exclude" + ))) + }); + for (idx, res) in results.iter().enumerate() { + let sk_id = safekeepers[idx].skp.id; + let generation = mconf.generation.into_inner(); + + if res.is_ok() { + self.persistence + .remove_pending_op( + tenant_id, + Some(timeline_id), + NodeId(sk_id as u64), + generation, + ) + .await?; + } else { let req = ScheduleRequest { safekeeper: Box::new(safekeepers[idx].clone()), host_list: Vec::new(), tenant_id, timeline_id: Some(timeline_id), - generation: config.generation.into_inner(), + generation, kind: SafekeeperTimelineOpKind::Exclude, }; reconcile_requests.push(req); @@ -1208,6 +1232,22 @@ impl Service { } // It it is the same new_sk_set, we can continue the migration (retry). } else { + let prev_finished = timeline.cplane_notified_generation == timeline.generation + && timeline.sk_set_notified_generation == timeline.generation; + + if !prev_finished { + // The previous migration is committed, but the finish step failed. + // Safekeepers/cplane might not know about the last membership configuration. + // Retry the finish step to ensure smooth migration. + self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline) + .await?; + } + + if cur_sk_set == new_sk_set { + tracing::info!("timeline is already at the desired safekeeper set"); + return Ok(()); + } + // 3. No active migration yet. // Increment current generation and put desired_set to new_sk_set. generation = generation.next(); @@ -1219,8 +1259,15 @@ impl Service { generation, &cur_sk_set, Some(&new_sk_set), + &[], ) .await?; + + fail::fail_point!("sk-migration-after-step-3", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-3" + ))) + }); } let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; @@ -1249,6 +1296,7 @@ impl Service { &cur_safekeepers, &joint_config, None, // no min position + true, // update notified generation ) .await?; @@ -1266,6 +1314,12 @@ impl Service { "safekeepers set membership updated", ); + fail::fail_point!("sk-migration-after-step-4", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-4" + ))) + }); + // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet // by doing pull_timeline from the majority of the current set. @@ -1285,6 +1339,12 @@ impl Service { ) .await?; + fail::fail_point!("sk-migration-after-step-5", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-5" + ))) + }); + // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough. // TODO(diko): do we need to bump timeline term? @@ -1300,9 +1360,16 @@ impl Service { &new_safekeepers, &joint_config, Some(sync_position), + false, // we're just waiting for sync position, don't update notified generation ) .await?; + fail::fail_point!("sk-migration-after-step-7", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-7" + ))) + }); + // 8. Create new_conf: Configuration incrementing joint_conf generation and // having new safekeeper set as sk_set and None new_sk_set. @@ -1314,45 +1381,55 @@ impl Service { new_members: None, }; - self.persistence - .update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None) - .await?; - - // TODO(diko): at this point we have already updated the timeline in the database, - // but we still need to notify safekeepers and cplane about the new configuration, - // and put delition of the timeline from the old safekeepers into the reconciler. - // Ideally it should be done atomically, but now it's not. - // Worst case: the timeline is not deleted from old safekeepers, - // the compute may require both quorums till the migration is retried and completed. - - self.tenant_timeline_set_membership_quorum( - tenant_id, - timeline_id, - &new_safekeepers, - &new_conf, - None, // no min position - ) - .await?; - let new_ids: HashSet = new_safekeepers.iter().map(|sk| sk.get_id()).collect(); let exclude_safekeepers = cur_safekeepers .into_iter() .filter(|sk| !new_ids.contains(&sk.get_id())) .collect::>(); - self.tenant_timeline_safekeeper_exclude( + let exclude_requests = exclude_safekeepers + .iter() + .map(|sk| TimelinePendingOpPersistence { + sk_id: sk.skp.id, + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: generation.into_inner() as i32, + op_kind: SafekeeperTimelineOpKind::Exclude, + }) + .collect::>(); + + self.persistence + .update_timeline_membership( + tenant_id, + timeline_id, + generation, + &new_sk_set, + None, + &exclude_requests, + ) + .await?; + + fail::fail_point!("sk-migration-after-step-8", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-8" + ))) + }); + + // At this point we have already updated the timeline in the database, so the final + // membership configuration is commited and the migration is not abortable anymore. + // But safekeepers and cplane/compute still need to be notified about the new configuration. + // The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about + // the new configuration and reconciles excluded safekeepers. + // If it fails, the safkeeper migration call should be retried. + + self.finish_safekeeper_migration( tenant_id, timeline_id, - &exclude_safekeepers, + &new_safekeepers, &new_conf, + &exclude_safekeepers, ) .await?; - // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers. - // This way the compute will stop talking to excluded safekeepers only after we stop requiring to - // collect a quorum from them. - self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf) - .await?; - Ok(()) } @@ -1396,6 +1473,130 @@ impl Service { ApiError::InternalServerError(anyhow::anyhow!( "failed to notify cplane about safekeeper membership change: {err}" )) - }) + })?; + + self.persistence + .update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation) + .await?; + + Ok(()) + } + + /// Finish safekeeper migration. + /// + /// It is the last step of the safekeeper migration. + /// + /// Notifies safekeepers and cplane about the final membership configuration, + /// reconciles excluded safekeepers and updates *_notified_generation in the database. + async fn finish_safekeeper_migration( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + new_safekeepers: &[Safekeeper], + new_conf: &membership::Configuration, + exclude_safekeepers: &[Safekeeper], + ) -> Result<(), ApiError> { + // 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf. + // Also try to exclude safekeepers and notify cplane about the membership change. + + self.tenant_timeline_set_membership_quorum( + tenant_id, + timeline_id, + new_safekeepers, + new_conf, + None, // no min position + true, // update notified generation + ) + .await?; + + fail::fail_point!("sk-migration-step-9-after-set-membership", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-step-9-after-set-membership" + ))) + }); + + self.tenant_timeline_safekeeper_exclude_reconcile( + tenant_id, + timeline_id, + exclude_safekeepers, + new_conf, + ) + .await?; + + fail::fail_point!("sk-migration-step-9-after-exclude", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-step-9-after-exclude" + ))) + }); + + // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers. + // This way the compute will stop talking to excluded safekeepers only after we stop requiring to + // collect a quorum from them. + self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf) + .await?; + + fail::fail_point!("sk-migration-after-step-9", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-9" + ))) + }); + + Ok(()) + } + + /// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database. + /// It's used when the migration failed during the finish step and we need to retry it. + async fn finish_safekeeper_migration_retry( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + timeline: &TimelinePersistence, + ) -> Result<(), ApiError> { + if timeline.new_sk_set.is_some() { + // Logical error, should never happen. + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None" + ))); + } + + let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; + let cur_sk_member_set = + Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?; + + let mconf = membership::Configuration { + generation: SafekeeperGeneration::new(timeline.generation as u32), + members: cur_sk_member_set, + new_members: None, + }; + + // We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler. + // Reload them from the database. + let pending_ops = self + .persistence + .list_pending_ops_for_timeline(tenant_id, timeline_id) + .await?; + + let mut exclude_sk_ids = Vec::new(); + + for op in pending_ops { + if op.op_kind == SafekeeperTimelineOpKind::Exclude + && op.generation == timeline.generation + { + exclude_sk_ids.push(op.sk_id); + } + } + + let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?; + + self.finish_safekeeper_migration( + tenant_id, + timeline_id, + &cur_safekeepers, + &mconf, + &exclude_safekeepers, + ) + .await?; + + Ok(()) } } diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index 170c1a3650..371bec0c62 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -3,11 +3,22 @@ from __future__ import annotations from typing import TYPE_CHECKING import pytest +import requests +from fixtures.log_helper import log from fixtures.neon_fixtures import StorageControllerApiException if TYPE_CHECKING: from fixtures.neon_fixtures import NeonEnvBuilder +# TODO(diko): pageserver spams with various errors during safekeeper migration. +# Fix the code so it handles the migration better. +ALLOWED_PAGESERVER_ERRORS = [ + ".*Timeline .* was cancelled and cannot be used anymore.*", + ".*Timeline .* has been deleted.*", + ".*Timeline .* was not found in global map.*", + ".*wal receiver task finished with an error.*", +] + def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): """ @@ -24,16 +35,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): "timeline_safekeeper_count": 1, } env = neon_env_builder.init_start() - # TODO(diko): pageserver spams with various errors during safekeeper migration. - # Fix the code so it handles the migration better. - env.pageserver.allowed_errors.extend( - [ - ".*Timeline .* was cancelled and cannot be used anymore.*", - ".*Timeline .* has been deleted.*", - ".*Timeline .* was not found in global map.*", - ".*wal receiver task finished with an error.*", - ] - ) + env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS) ep = env.endpoints.create("main", tenant_id=env.initial_tenant) @@ -42,15 +44,23 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): assert len(mconf["sk_set"]) == 1 assert mconf["generation"] == 1 + current_sk = mconf["sk_set"][0] + ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"]) ep.safe_psql("CREATE EXTENSION neon_test_utils;") ep.safe_psql("CREATE TABLE t(a int)") + expected_gen = 1 + for active_sk in range(1, 4): env.storage_controller.migrate_safekeepers( env.initial_tenant, env.initial_timeline, [active_sk] ) + if active_sk != current_sk: + expected_gen += 2 + current_sk = active_sk + other_sks = [sk for sk in range(1, 4) if sk != active_sk] for sk in other_sks: @@ -65,9 +75,6 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)] - # 1 initial generation + 2 migrations on each loop iteration. - expected_gen = 1 + 2 * 3 - mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) assert mconf["generation"] == expected_gen @@ -113,3 +120,79 @@ def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder): env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned") expect_fail([sk_set[0], decom_sk], "decomissioned") + + +def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper migration handles failures well. + + Two main conditions are checked: + 1. safekeeper migration handler can be retried on different failures. + 2. writes do not stuck if sk_set and new_sk_set have a quorum in common. + """ + neon_env_builder.num_safekeepers = 4 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 3, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert len(mconf["sk_set"]) == 3 + assert mconf["generation"] == 1 + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"]) + ep.safe_psql("CREATE EXTENSION neon_test_utils;") + ep.safe_psql("CREATE TABLE t(a int)") + + excluded_sk = mconf["sk_set"][-1] + added_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0] + new_sk_set = mconf["sk_set"][:-1] + [added_sk] + log.info(f"migrating sk set from {mconf['sk_set']} to {new_sk_set}") + + failpoints = [ + "sk-migration-after-step-3", + "sk-migration-after-step-4", + "sk-migration-after-step-5", + "sk-migration-after-step-7", + "sk-migration-after-step-8", + "sk-migration-step-9-after-set-membership", + "sk-migration-step-9-mid-exclude", + "sk-migration-step-9-after-exclude", + "sk-migration-after-step-9", + ] + + for i, fp in enumerate(failpoints): + env.storage_controller.configure_failpoints((fp, "return(1)")) + + with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, new_sk_set + ) + ep.safe_psql(f"INSERT INTO t VALUES ({i})") + + env.storage_controller.configure_failpoints((fp, "off")) + + # No failpoints, migration should succeed. + env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["new_sk_set"] is None + assert mconf["sk_set"] == new_sk_set + assert mconf["generation"] == 3 + + ep.clear_buffers() + assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(len(failpoints))] + assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith("g#3:") + + # Check that we didn't forget to remove the timeline on the excluded safekeeper. + with pytest.raises(requests.exceptions.HTTPError) as exc: + env.safekeepers[excluded_sk - 1].http_client().timeline_status( + env.initial_tenant, env.initial_timeline + ) + assert exc.value.response.status_code == 404 + assert ( + f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text + )