storcon: finish safekeeper migration gracefully (#12528)

## Problem
We don't detect if safekeeper migration fails after the the commiting
the membership configuration to the database. As a result, we might
leave stale timelines on excluded safekeepers and do not notify
cplane/safekepeers about new configuration.

- Implements solution proposed in
https://github.com/neondatabase/neon/pull/12432
- Closes: https://github.com/neondatabase/neon/issues/12192
- Closes: [LKB-944](https://databricks.atlassian.net/browse/LKB-944)

## Summary of changes
- Add `sk_set_notified_generation` column to `timelines` database
- Update `*_notified_generation` in database during the finish state.
- Commit reconciliation requests to database atomically with membership
configuration.
- Reload pending ops and retry "finish" step if we detect
`*_notified_generation` mismatch.
- Add failpoints and test that we handle failures well
This commit is contained in:
Dmitrii Kovalkov
2025-07-22 18:58:20 +04:00
committed by GitHub
parent 88391ce069
commit 133f16e9b5
7 changed files with 485 additions and 81 deletions

View File

@@ -427,6 +427,9 @@ impl From<TimelineError> for ApiError {
TimelineError::NotFound(ttid) => { TimelineError::NotFound(ttid) => {
ApiError::NotFound(anyhow!("timeline {} not found", ttid).into()) ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
} }
TimelineError::Deleted(ttid) => {
ApiError::NotFound(anyhow!("timeline {} deleted", ttid).into())
}
_ => ApiError::InternalServerError(anyhow!("{}", te)), _ => ApiError::InternalServerError(anyhow!("{}", te)),
} }
} }

View File

@@ -0,0 +1 @@
ALTER TABLE timelines DROP sk_set_notified_generation;

View File

@@ -0,0 +1 @@
ALTER TABLE timelines ADD sk_set_notified_generation INTEGER NOT NULL DEFAULT 1;

View File

@@ -131,6 +131,8 @@ pub(crate) enum DatabaseOperation {
InsertTimeline, InsertTimeline,
UpdateTimeline, UpdateTimeline,
UpdateTimelineMembership, UpdateTimelineMembership,
UpdateCplaneNotifiedGeneration,
UpdateSkSetNotifiedGeneration,
GetTimeline, GetTimeline,
InsertTimelineReconcile, InsertTimelineReconcile,
RemoveTimelineReconcile, RemoveTimelineReconcile,
@@ -1497,6 +1499,8 @@ impl Persistence {
/// Update timeline membership configuration in the database. /// Update timeline membership configuration in the database.
/// Perform a compare-and-swap (CAS) operation on the timeline's generation. /// Perform a compare-and-swap (CAS) operation on the timeline's generation.
/// The `new_generation` must be the next (+1) generation after the one in the database. /// The `new_generation` must be the next (+1) generation after the one in the database.
/// Also inserts reconcile_requests to safekeeper_timeline_pending_ops table in the same
/// transaction.
pub(crate) async fn update_timeline_membership( pub(crate) async fn update_timeline_membership(
&self, &self,
tenant_id: TenantId, tenant_id: TenantId,
@@ -1504,8 +1508,11 @@ impl Persistence {
new_generation: SafekeeperGeneration, new_generation: SafekeeperGeneration,
sk_set: &[NodeId], sk_set: &[NodeId],
new_sk_set: Option<&[NodeId]>, new_sk_set: Option<&[NodeId]>,
reconcile_requests: &[TimelinePendingOpPersistence],
) -> DatabaseResult<()> { ) -> DatabaseResult<()> {
use crate::schema::timelines::dsl; use crate::schema::safekeeper_timeline_pending_ops as stpo;
use crate::schema::timelines;
use diesel::query_dsl::methods::FilterDsl;
let prev_generation = new_generation.previous().unwrap(); let prev_generation = new_generation.previous().unwrap();
@@ -1513,14 +1520,15 @@ impl Persistence {
let timeline_id = &timeline_id; let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| { self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| {
Box::pin(async move { Box::pin(async move {
let updated = diesel::update(dsl::timelines) let updated = diesel::update(timelines::table)
.filter(dsl::tenant_id.eq(&tenant_id.to_string())) .filter(timelines::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string())) .filter(timelines::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::generation.eq(prev_generation.into_inner() as i32)) .filter(timelines::generation.eq(prev_generation.into_inner() as i32))
.set(( .set((
dsl::generation.eq(new_generation.into_inner() as i32), timelines::generation.eq(new_generation.into_inner() as i32),
dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()), timelines::sk_set
dsl::new_sk_set.eq(new_sk_set .eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
timelines::new_sk_set.eq(new_sk_set
.map(|set| set.iter().map(|id| id.0 as i64).collect::<Vec<_>>())), .map(|set| set.iter().map(|id| id.0 as i64).collect::<Vec<_>>())),
)) ))
.execute(conn) .execute(conn)
@@ -1530,17 +1538,120 @@ impl Persistence {
0 => { 0 => {
// TODO(diko): It makes sense to select the current generation // TODO(diko): It makes sense to select the current generation
// and include it in the error message for better debuggability. // and include it in the error message for better debuggability.
Err(DatabaseError::Cas( return Err(DatabaseError::Cas(
"Failed to update membership configuration".to_string(), "Failed to update membership configuration".to_string(),
)) ));
} }
1 => {}
_ => {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
)));
}
};
for req in reconcile_requests {
let inserted_updated = diesel::insert_into(stpo::table)
.values(req)
.on_conflict((stpo::tenant_id, stpo::timeline_id, stpo::sk_id))
.do_update()
.set(req)
.filter(stpo::generation.lt(req.generation))
.execute(conn)
.await?;
if inserted_updated > 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({inserted_updated})"
)));
}
}
Ok(())
})
})
.await
}
/// Update the cplane notified generation for a timeline.
/// Perform a compare-and-swap (CAS) operation on the timeline's cplane notified generation.
/// The update will fail if the specified generation is less than the cplane notified generation
/// in the database.
pub(crate) async fn update_cplane_notified_generation(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(
DatabaseOperation::UpdateCplaneNotifiedGeneration,
move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::cplane_notified_generation.le(generation.into_inner() as i32))
.set(dsl::cplane_notified_generation.eq(generation.into_inner() as i32))
.execute(conn)
.await?;
match updated {
0 => Err(DatabaseError::Cas(
"Failed to update cplane notified generation".to_string(),
)),
1 => Ok(()), 1 => Ok(()),
_ => Err(DatabaseError::Logical(format!( _ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})" "unexpected number of rows ({updated})"
))), ))),
} }
}) })
},
)
.await
}
/// Update the sk set notified generation for a timeline.
/// Perform a compare-and-swap (CAS) operation on the timeline's sk set notified generation.
/// The update will fail if the specified generation is less than the sk set notified generation
/// in the database.
pub(crate) async fn update_sk_set_notified_generation(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(
DatabaseOperation::UpdateSkSetNotifiedGeneration,
move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::sk_set_notified_generation.le(generation.into_inner() as i32))
.set(dsl::sk_set_notified_generation.eq(generation.into_inner() as i32))
.execute(conn)
.await?;
match updated {
0 => Err(DatabaseError::Cas(
"Failed to update sk set notified generation".to_string(),
)),
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
))),
}
}) })
},
)
.await .await
} }
@@ -2493,6 +2604,7 @@ pub(crate) struct TimelinePersistence {
pub(crate) new_sk_set: Option<Vec<i64>>, pub(crate) new_sk_set: Option<Vec<i64>>,
pub(crate) cplane_notified_generation: i32, pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>, pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) sk_set_notified_generation: i32,
} }
/// This is separate from [TimelinePersistence] only because postgres allows NULLs /// This is separate from [TimelinePersistence] only because postgres allows NULLs
@@ -2511,6 +2623,7 @@ pub(crate) struct TimelineFromDb {
pub(crate) new_sk_set: Option<Vec<Option<i64>>>, pub(crate) new_sk_set: Option<Vec<Option<i64>>>,
pub(crate) cplane_notified_generation: i32, pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>, pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) sk_set_notified_generation: i32,
} }
impl TimelineFromDb { impl TimelineFromDb {
@@ -2530,6 +2643,7 @@ impl TimelineFromDb {
new_sk_set, new_sk_set,
cplane_notified_generation: self.cplane_notified_generation, cplane_notified_generation: self.cplane_notified_generation,
deleted_at: self.deleted_at, deleted_at: self.deleted_at,
sk_set_notified_generation: self.sk_set_notified_generation,
} }
} }
} }

View File

@@ -118,6 +118,7 @@ diesel::table! {
new_sk_set -> Nullable<Array<Nullable<Int8>>>, new_sk_set -> Nullable<Array<Nullable<Int8>>>,
cplane_notified_generation -> Int4, cplane_notified_generation -> Int4,
deleted_at -> Nullable<Timestamptz>, deleted_at -> Nullable<Timestamptz>,
sk_set_notified_generation -> Int4,
} }
} }

View File

@@ -312,6 +312,7 @@ impl Service {
new_sk_set: None, new_sk_set: None,
cplane_notified_generation: 0, cplane_notified_generation: 0,
deleted_at: None, deleted_at: None,
sk_set_notified_generation: 0,
}; };
let inserted = self let inserted = self
.persistence .persistence
@@ -461,6 +462,7 @@ impl Service {
new_sk_set: None, new_sk_set: None,
cplane_notified_generation: 1, cplane_notified_generation: 1,
deleted_at: None, deleted_at: None,
sk_set_notified_generation: 1,
}; };
let inserted = self let inserted = self
.persistence .persistence
@@ -894,17 +896,21 @@ impl Service {
/// If min_position is not None, validates that majority of safekeepers /// If min_position is not None, validates that majority of safekeepers
/// reached at least min_position. /// reached at least min_position.
/// ///
/// If update_notified_generation is set, also updates sk_set_notified_generation
/// in the timelines table.
///
/// Return responses from safekeepers in the input order. /// Return responses from safekeepers in the input order.
async fn tenant_timeline_set_membership_quorum( async fn tenant_timeline_set_membership_quorum(
self: &Arc<Self>, self: &Arc<Self>,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
safekeepers: &[Safekeeper], safekeepers: &[Safekeeper],
config: &membership::Configuration, mconf: &membership::Configuration,
min_position: Option<(Term, Lsn)>, min_position: Option<(Term, Lsn)>,
update_notified_generation: bool,
) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> { ) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
let req = TimelineMembershipSwitchRequest { let req = TimelineMembershipSwitchRequest {
mconf: config.clone(), mconf: mconf.clone(),
}; };
const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -945,28 +951,34 @@ impl Service {
.await?; .await?;
for res in results.iter().flatten() { for res in results.iter().flatten() {
if res.current_conf.generation > config.generation { if res.current_conf.generation > mconf.generation {
// Antoher switch_membership raced us. // Antoher switch_membership raced us.
return Err(ApiError::Conflict(format!( return Err(ApiError::Conflict(format!(
"received configuration with generation {} from safekeeper, but expected {}", "received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation, config.generation res.current_conf.generation, mconf.generation
))); )));
} else if res.current_conf.generation < config.generation { } else if res.current_conf.generation < mconf.generation {
// Note: should never happen. // Note: should never happen.
// If we get a response, it should be at least the sent generation. // If we get a response, it should be at least the sent generation.
tracing::error!( tracing::error!(
"received configuration with generation {} from safekeeper, but expected {}", "received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation, res.current_conf.generation,
config.generation mconf.generation
); );
return Err(ApiError::InternalServerError(anyhow::anyhow!( return Err(ApiError::InternalServerError(anyhow::anyhow!(
"received configuration with generation {} from safekeeper, but expected {}", "received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation, res.current_conf.generation,
config.generation mconf.generation
))); )));
} }
} }
if update_notified_generation {
self.persistence
.update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation)
.await?;
}
Ok(results) Ok(results)
} }
@@ -1035,17 +1047,22 @@ impl Service {
} }
/// Exclude a timeline from safekeepers in parallel with retries. /// Exclude a timeline from safekeepers in parallel with retries.
/// If an exclude request is unsuccessful, it will be added to ///
/// the reconciler, and after that the function will succeed. /// Assumes that the exclude requests are already persistent in the database.
async fn tenant_timeline_safekeeper_exclude( ///
/// The function does best effort: if an exclude request is unsuccessful,
/// it will be added to the in-memory reconciler, and the function will succeed anyway.
///
/// Might fail if there is error accessing the database.
async fn tenant_timeline_safekeeper_exclude_reconcile(
self: &Arc<Self>, self: &Arc<Self>,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
safekeepers: &[Safekeeper], safekeepers: &[Safekeeper],
config: &membership::Configuration, mconf: &membership::Configuration,
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
let req = TimelineMembershipSwitchRequest { let req = TimelineMembershipSwitchRequest {
mconf: config.clone(), mconf: mconf.clone(),
}; };
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30); const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -1063,25 +1080,32 @@ impl Service {
let mut reconcile_requests = Vec::new(); let mut reconcile_requests = Vec::new();
for (idx, res) in results.iter().enumerate() { fail::fail_point!("sk-migration-step-9-mid-exclude", |_| {
if res.is_err() { Err(ApiError::BadRequest(anyhow::anyhow!(
let sk_id = safekeepers[idx].skp.id; "failpoint sk-migration-step-9-mid-exclude"
let pending_op = TimelinePendingOpPersistence { )))
tenant_id: tenant_id.to_string(), });
timeline_id: timeline_id.to_string(),
generation: config.generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
sk_id,
};
tracing::info!("writing pending exclude op for sk id {sk_id}");
self.persistence.insert_pending_op(pending_op).await?;
for (idx, res) in results.iter().enumerate() {
let sk_id = safekeepers[idx].skp.id;
let generation = mconf.generation.into_inner();
if res.is_ok() {
self.persistence
.remove_pending_op(
tenant_id,
Some(timeline_id),
NodeId(sk_id as u64),
generation,
)
.await?;
} else {
let req = ScheduleRequest { let req = ScheduleRequest {
safekeeper: Box::new(safekeepers[idx].clone()), safekeeper: Box::new(safekeepers[idx].clone()),
host_list: Vec::new(), host_list: Vec::new(),
tenant_id, tenant_id,
timeline_id: Some(timeline_id), timeline_id: Some(timeline_id),
generation: config.generation.into_inner(), generation,
kind: SafekeeperTimelineOpKind::Exclude, kind: SafekeeperTimelineOpKind::Exclude,
}; };
reconcile_requests.push(req); reconcile_requests.push(req);
@@ -1208,6 +1232,22 @@ impl Service {
} }
// It it is the same new_sk_set, we can continue the migration (retry). // It it is the same new_sk_set, we can continue the migration (retry).
} else { } else {
let prev_finished = timeline.cplane_notified_generation == timeline.generation
&& timeline.sk_set_notified_generation == timeline.generation;
if !prev_finished {
// The previous migration is committed, but the finish step failed.
// Safekeepers/cplane might not know about the last membership configuration.
// Retry the finish step to ensure smooth migration.
self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
.await?;
}
if cur_sk_set == new_sk_set {
tracing::info!("timeline is already at the desired safekeeper set");
return Ok(());
}
// 3. No active migration yet. // 3. No active migration yet.
// Increment current generation and put desired_set to new_sk_set. // Increment current generation and put desired_set to new_sk_set.
generation = generation.next(); generation = generation.next();
@@ -1219,8 +1259,15 @@ impl Service {
generation, generation,
&cur_sk_set, &cur_sk_set,
Some(&new_sk_set), Some(&new_sk_set),
&[],
) )
.await?; .await?;
fail::fail_point!("sk-migration-after-step-3", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-3"
)))
});
} }
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
@@ -1249,6 +1296,7 @@ impl Service {
&cur_safekeepers, &cur_safekeepers,
&joint_config, &joint_config,
None, // no min position None, // no min position
true, // update notified generation
) )
.await?; .await?;
@@ -1266,6 +1314,12 @@ impl Service {
"safekeepers set membership updated", "safekeepers set membership updated",
); );
fail::fail_point!("sk-migration-after-step-4", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-4"
)))
});
// 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
// by doing pull_timeline from the majority of the current set. // by doing pull_timeline from the majority of the current set.
@@ -1285,6 +1339,12 @@ impl Service {
) )
.await?; .await?;
fail::fail_point!("sk-migration-after-step-5", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-5"
)))
});
// 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough. // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
// TODO(diko): do we need to bump timeline term? // TODO(diko): do we need to bump timeline term?
@@ -1300,9 +1360,16 @@ impl Service {
&new_safekeepers, &new_safekeepers,
&joint_config, &joint_config,
Some(sync_position), Some(sync_position),
false, // we're just waiting for sync position, don't update notified generation
) )
.await?; .await?;
fail::fail_point!("sk-migration-after-step-7", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-7"
)))
});
// 8. Create new_conf: Configuration incrementing joint_conf generation and // 8. Create new_conf: Configuration incrementing joint_conf generation and
// having new safekeeper set as sk_set and None new_sk_set. // having new safekeeper set as sk_set and None new_sk_set.
@@ -1314,43 +1381,53 @@ impl Service {
new_members: None, new_members: None,
}; };
self.persistence
.update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
.await?;
// TODO(diko): at this point we have already updated the timeline in the database,
// but we still need to notify safekeepers and cplane about the new configuration,
// and put delition of the timeline from the old safekeepers into the reconciler.
// Ideally it should be done atomically, but now it's not.
// Worst case: the timeline is not deleted from old safekeepers,
// the compute may require both quorums till the migration is retried and completed.
self.tenant_timeline_set_membership_quorum(
tenant_id,
timeline_id,
&new_safekeepers,
&new_conf,
None, // no min position
)
.await?;
let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect(); let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
let exclude_safekeepers = cur_safekeepers let exclude_safekeepers = cur_safekeepers
.into_iter() .into_iter()
.filter(|sk| !new_ids.contains(&sk.get_id())) .filter(|sk| !new_ids.contains(&sk.get_id()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
self.tenant_timeline_safekeeper_exclude( let exclude_requests = exclude_safekeepers
.iter()
.map(|sk| TimelinePendingOpPersistence {
sk_id: sk.skp.id,
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
})
.collect::<Vec<_>>();
self.persistence
.update_timeline_membership(
tenant_id, tenant_id,
timeline_id, timeline_id,
&exclude_safekeepers, generation,
&new_conf, &new_sk_set,
None,
&exclude_requests,
) )
.await?; .await?;
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers. fail::fail_point!("sk-migration-after-step-8", |_| {
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to Err(ApiError::BadRequest(anyhow::anyhow!(
// collect a quorum from them. "failpoint sk-migration-after-step-8"
self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf) )))
});
// At this point we have already updated the timeline in the database, so the final
// membership configuration is commited and the migration is not abortable anymore.
// But safekeepers and cplane/compute still need to be notified about the new configuration.
// The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about
// the new configuration and reconciles excluded safekeepers.
// If it fails, the safkeeper migration call should be retried.
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&new_safekeepers,
&new_conf,
&exclude_safekeepers,
)
.await?; .await?;
Ok(()) Ok(())
@@ -1396,6 +1473,130 @@ impl Service {
ApiError::InternalServerError(anyhow::anyhow!( ApiError::InternalServerError(anyhow::anyhow!(
"failed to notify cplane about safekeeper membership change: {err}" "failed to notify cplane about safekeeper membership change: {err}"
)) ))
}) })?;
self.persistence
.update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation)
.await?;
Ok(())
}
/// Finish safekeeper migration.
///
/// It is the last step of the safekeeper migration.
///
/// Notifies safekeepers and cplane about the final membership configuration,
/// reconciles excluded safekeepers and updates *_notified_generation in the database.
async fn finish_safekeeper_migration(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
new_safekeepers: &[Safekeeper],
new_conf: &membership::Configuration,
exclude_safekeepers: &[Safekeeper],
) -> Result<(), ApiError> {
// 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf.
// Also try to exclude safekeepers and notify cplane about the membership change.
self.tenant_timeline_set_membership_quorum(
tenant_id,
timeline_id,
new_safekeepers,
new_conf,
None, // no min position
true, // update notified generation
)
.await?;
fail::fail_point!("sk-migration-step-9-after-set-membership", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-after-set-membership"
)))
});
self.tenant_timeline_safekeeper_exclude_reconcile(
tenant_id,
timeline_id,
exclude_safekeepers,
new_conf,
)
.await?;
fail::fail_point!("sk-migration-step-9-after-exclude", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-after-exclude"
)))
});
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf)
.await?;
fail::fail_point!("sk-migration-after-step-9", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-9"
)))
});
Ok(())
}
/// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database.
/// It's used when the migration failed during the finish step and we need to retry it.
async fn finish_safekeeper_migration_retry(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline: &TimelinePersistence,
) -> Result<(), ApiError> {
if timeline.new_sk_set.is_some() {
// Logical error, should never happen.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None"
)));
}
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
let cur_sk_member_set =
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
let mconf = membership::Configuration {
generation: SafekeeperGeneration::new(timeline.generation as u32),
members: cur_sk_member_set,
new_members: None,
};
// We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler.
// Reload them from the database.
let pending_ops = self
.persistence
.list_pending_ops_for_timeline(tenant_id, timeline_id)
.await?;
let mut exclude_sk_ids = Vec::new();
for op in pending_ops {
if op.op_kind == SafekeeperTimelineOpKind::Exclude
&& op.generation == timeline.generation
{
exclude_sk_ids.push(op.sk_id);
}
}
let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?;
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&cur_safekeepers,
&mconf,
&exclude_safekeepers,
)
.await?;
Ok(())
} }
} }

View File

@@ -3,11 +3,22 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
import requests
from fixtures.log_helper import log
from fixtures.neon_fixtures import StorageControllerApiException from fixtures.neon_fixtures import StorageControllerApiException
if TYPE_CHECKING: if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.neon_fixtures import NeonEnvBuilder
# TODO(diko): pageserver spams with various errors during safekeeper migration.
# Fix the code so it handles the migration better.
ALLOWED_PAGESERVER_ERRORS = [
".*Timeline .* was cancelled and cannot be used anymore.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was not found in global map.*",
".*wal receiver task finished with an error.*",
]
def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
""" """
@@ -24,16 +35,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
"timeline_safekeeper_count": 1, "timeline_safekeeper_count": 1,
} }
env = neon_env_builder.init_start() env = neon_env_builder.init_start()
# TODO(diko): pageserver spams with various errors during safekeeper migration. env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS)
# Fix the code so it handles the migration better.
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was cancelled and cannot be used anymore.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was not found in global map.*",
".*wal receiver task finished with an error.*",
]
)
ep = env.endpoints.create("main", tenant_id=env.initial_tenant) ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
@@ -42,15 +44,23 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
assert len(mconf["sk_set"]) == 1 assert len(mconf["sk_set"]) == 1
assert mconf["generation"] == 1 assert mconf["generation"] == 1
current_sk = mconf["sk_set"][0]
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"]) ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;") ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)") ep.safe_psql("CREATE TABLE t(a int)")
expected_gen = 1
for active_sk in range(1, 4): for active_sk in range(1, 4):
env.storage_controller.migrate_safekeepers( env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [active_sk] env.initial_tenant, env.initial_timeline, [active_sk]
) )
if active_sk != current_sk:
expected_gen += 2
current_sk = active_sk
other_sks = [sk for sk in range(1, 4) if sk != active_sk] other_sks = [sk for sk in range(1, 4) if sk != active_sk]
for sk in other_sks: for sk in other_sks:
@@ -65,9 +75,6 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)] assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
# 1 initial generation + 2 migrations on each loop iteration.
expected_gen = 1 + 2 * 3
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == expected_gen assert mconf["generation"] == expected_gen
@@ -113,3 +120,79 @@ def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder):
env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned") env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned")
expect_fail([sk_set[0], decom_sk], "decomissioned") expect_fail([sk_set[0], decom_sk], "decomissioned")
def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration handles failures well.
Two main conditions are checked:
1. safekeeper migration handler can be retried on different failures.
2. writes do not stuck if sk_set and new_sk_set have a quorum in common.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 3
assert mconf["generation"] == 1
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
excluded_sk = mconf["sk_set"][-1]
added_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
new_sk_set = mconf["sk_set"][:-1] + [added_sk]
log.info(f"migrating sk set from {mconf['sk_set']} to {new_sk_set}")
failpoints = [
"sk-migration-after-step-3",
"sk-migration-after-step-4",
"sk-migration-after-step-5",
"sk-migration-after-step-7",
"sk-migration-after-step-8",
"sk-migration-step-9-after-set-membership",
"sk-migration-step-9-mid-exclude",
"sk-migration-step-9-after-exclude",
"sk-migration-after-step-9",
]
for i, fp in enumerate(failpoints):
env.storage_controller.configure_failpoints((fp, "return(1)"))
with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set
)
ep.safe_psql(f"INSERT INTO t VALUES ({i})")
env.storage_controller.configure_failpoints((fp, "off"))
# No failpoints, migration should succeed.
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] is None
assert mconf["sk_set"] == new_sk_set
assert mconf["generation"] == 3
ep.clear_buffers()
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(len(failpoints))]
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith("g#3:")
# Check that we didn't forget to remove the timeline on the excluded safekeeper.
with pytest.raises(requests.exceptions.HTTPError) as exc:
env.safekeepers[excluded_sk - 1].http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
assert exc.value.response.status_code == 404
assert (
f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text
)