Merge remote-tracking branch 'origin' into vlad/hadron-jwt

This commit is contained in:
Vlad Lazar
2025-07-23 19:15:31 +01:00
187 changed files with 5704 additions and 1034 deletions

View File

@@ -8,10 +8,10 @@ static CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz01
/// Generate a random string of `length` that can be used as a password. The generated string
/// contains alphanumeric characters and special characters (!@#$%^&*())
pub fn generate_random_password(length: usize) -> String {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
(0..length)
.map(|_| {
let idx = rng.gen_range(0..CHARSET.len());
let idx = rng.random_range(0..CHARSET.len());
CHARSET[idx] as char
})
.collect()

View File

@@ -242,6 +242,10 @@ struct Cli {
#[arg(long)]
shard_split_request_timeout: Option<humantime::Duration>,
/// **Feature Flag** Whether the storage controller should act to rectify pageserver-reported local disk loss.
#[arg(long, default_value = "false")]
handle_ps_local_disk_loss: bool,
}
enum StrictMode {
@@ -508,6 +512,7 @@ async fn async_main() -> anyhow::Result<()> {
.shard_split_request_timeout
.map(humantime::Duration::into)
.unwrap_or(Duration::MAX),
handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
};
// Validate that we can connect to the database

View File

@@ -129,7 +129,10 @@ pub(crate) enum DatabaseOperation {
UpdateLeader,
SetPreferredAzs,
InsertTimeline,
UpdateTimeline,
UpdateTimelineMembership,
UpdateCplaneNotifiedGeneration,
UpdateSkSetNotifiedGeneration,
GetTimeline,
InsertTimelineReconcile,
RemoveTimelineReconcile,
@@ -1463,9 +1466,41 @@ impl Persistence {
.await
}
/// Update an already present timeline.
/// VERY UNSAFE FUNCTION: this overrides in-progress migrations. Don't use this unless neccessary.
pub(crate) async fn update_timeline_unsafe(
&self,
entry: TimelineUpdate,
) -> DatabaseResult<bool> {
use crate::schema::timelines;
let entry = &entry;
self.with_measured_conn(DatabaseOperation::UpdateTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(&entry.tenant_id))
.filter(timelines::timeline_id.eq(&entry.timeline_id))
.set(entry)
.execute(conn)
.await?;
match inserted_updated {
0 => Ok(false),
1 => Ok(true),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({inserted_updated})"
))),
}
})
})
.await
}
/// Update timeline membership configuration in the database.
/// Perform a compare-and-swap (CAS) operation on the timeline's generation.
/// The `new_generation` must be the next (+1) generation after the one in the database.
/// Also inserts reconcile_requests to safekeeper_timeline_pending_ops table in the same
/// transaction.
pub(crate) async fn update_timeline_membership(
&self,
tenant_id: TenantId,
@@ -1473,8 +1508,11 @@ impl Persistence {
new_generation: SafekeeperGeneration,
sk_set: &[NodeId],
new_sk_set: Option<&[NodeId]>,
reconcile_requests: &[TimelinePendingOpPersistence],
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
use crate::schema::safekeeper_timeline_pending_ops as stpo;
use crate::schema::timelines;
use diesel::query_dsl::methods::FilterDsl;
let prev_generation = new_generation.previous().unwrap();
@@ -1482,14 +1520,15 @@ impl Persistence {
let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::generation.eq(prev_generation.into_inner() as i32))
let updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(&tenant_id.to_string()))
.filter(timelines::timeline_id.eq(&timeline_id.to_string()))
.filter(timelines::generation.eq(prev_generation.into_inner() as i32))
.set((
dsl::generation.eq(new_generation.into_inner() as i32),
dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
dsl::new_sk_set.eq(new_sk_set
timelines::generation.eq(new_generation.into_inner() as i32),
timelines::sk_set
.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
timelines::new_sk_set.eq(new_sk_set
.map(|set| set.iter().map(|id| id.0 as i64).collect::<Vec<_>>())),
))
.execute(conn)
@@ -1499,20 +1538,123 @@ impl Persistence {
0 => {
// TODO(diko): It makes sense to select the current generation
// and include it in the error message for better debuggability.
Err(DatabaseError::Cas(
return Err(DatabaseError::Cas(
"Failed to update membership configuration".to_string(),
))
));
}
1 => {}
_ => {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
)));
}
};
for req in reconcile_requests {
let inserted_updated = diesel::insert_into(stpo::table)
.values(req)
.on_conflict((stpo::tenant_id, stpo::timeline_id, stpo::sk_id))
.do_update()
.set(req)
.filter(stpo::generation.lt(req.generation))
.execute(conn)
.await?;
if inserted_updated > 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({inserted_updated})"
)));
}
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
))),
}
Ok(())
})
})
.await
}
/// Update the cplane notified generation for a timeline.
/// Perform a compare-and-swap (CAS) operation on the timeline's cplane notified generation.
/// The update will fail if the specified generation is less than the cplane notified generation
/// in the database.
pub(crate) async fn update_cplane_notified_generation(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(
DatabaseOperation::UpdateCplaneNotifiedGeneration,
move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::cplane_notified_generation.le(generation.into_inner() as i32))
.set(dsl::cplane_notified_generation.eq(generation.into_inner() as i32))
.execute(conn)
.await?;
match updated {
0 => Err(DatabaseError::Cas(
"Failed to update cplane notified generation".to_string(),
)),
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
))),
}
})
},
)
.await
}
/// Update the sk set notified generation for a timeline.
/// Perform a compare-and-swap (CAS) operation on the timeline's sk set notified generation.
/// The update will fail if the specified generation is less than the sk set notified generation
/// in the database.
pub(crate) async fn update_sk_set_notified_generation(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(
DatabaseOperation::UpdateSkSetNotifiedGeneration,
move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::sk_set_notified_generation.le(generation.into_inner() as i32))
.set(dsl::sk_set_notified_generation.eq(generation.into_inner() as i32))
.execute(conn)
.await?;
match updated {
0 => Err(DatabaseError::Cas(
"Failed to update sk set notified generation".to_string(),
)),
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
))),
}
})
},
)
.await
}
/// Load timeline from db. Returns `None` if not present.
pub(crate) async fn get_timeline(
&self,
@@ -2462,6 +2604,7 @@ pub(crate) struct TimelinePersistence {
pub(crate) new_sk_set: Option<Vec<i64>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) sk_set_notified_generation: i32,
}
/// This is separate from [TimelinePersistence] only because postgres allows NULLs
@@ -2480,6 +2623,7 @@ pub(crate) struct TimelineFromDb {
pub(crate) new_sk_set: Option<Vec<Option<i64>>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) sk_set_notified_generation: i32,
}
impl TimelineFromDb {
@@ -2499,10 +2643,23 @@ impl TimelineFromDb {
new_sk_set,
cplane_notified_generation: self.cplane_notified_generation,
deleted_at: self.deleted_at,
sk_set_notified_generation: self.sk_set_notified_generation,
}
}
}
// This is separate from TimelinePersistence because we don't want to touch generation and deleted_at values for the update.
#[derive(AsChangeset)]
#[diesel(table_name = crate::schema::timelines)]
#[diesel(treat_none_as_null = true)]
pub(crate) struct TimelineUpdate {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) start_lsn: LsnWrapper,
pub(crate) sk_set: Vec<i64>,
pub(crate) new_sk_set: Option<Vec<i64>>,
}
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
#[diesel(table_name = crate::schema::safekeeper_timeline_pending_ops)]
pub(crate) struct TimelinePendingOpPersistence {

View File

@@ -118,6 +118,7 @@ diesel::table! {
new_sk_set -> Nullable<Array<Nullable<Int8>>>,
cplane_notified_generation -> Int4,
deleted_at -> Nullable<Timestamptz>,
sk_set_notified_generation -> Int4,
}
}

View File

@@ -488,6 +488,9 @@ pub struct Config {
/// Timeout used for HTTP client of split requests. [`Duration::MAX`] if None.
pub shard_split_request_timeout: Duration,
// Feature flag: Whether the storage controller should act to rectify pageserver-reported local disk loss.
pub handle_ps_local_disk_loss: bool,
}
impl From<DatabaseError> for ApiError {
@@ -2399,6 +2402,33 @@ impl Service {
tenants: Vec::new(),
};
// [Hadron] If the pageserver reports in the reattach message that it has an empty disk, it's possible that it just
// recovered from a local disk failure. The response of the reattach request will contain a list of tenants but it
// will not be honored by the pageserver in this case (disk failure). We should make sure we clear any observed
// locations of tenants attached to the node so that the reconciler will discover the discrpancy and reconfigure the
// missing tenants on the node properly.
if self.config.handle_ps_local_disk_loss && reattach_req.empty_local_disk.unwrap_or(false) {
tracing::info!(
"Pageserver {node_id} reports empty local disk, clearing observed locations referencing the pageserver for all tenants",
node_id = reattach_req.node_id
);
let mut num_tenant_shards_affected = 0;
for (tenant_shard_id, shard) in tenants.iter_mut() {
if shard
.observed
.locations
.remove(&reattach_req.node_id)
.is_some()
{
tracing::info!("Cleared observed location for tenant shard {tenant_shard_id}");
num_tenant_shards_affected += 1;
}
}
tracing::info!(
"Cleared observed locations for {num_tenant_shards_affected} tenant shards"
);
}
// TODO: cancel/restart any running reconciliation for this tenant, it might be trying
// to call location_conf API with an old generation. Wait for cancellation to complete
// before responding to this request. Requires well implemented CancellationToken logic

View File

@@ -3,8 +3,8 @@ use std::sync::Arc;
use std::time::Duration;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use rand::seq::SliceRandom;
use rand::{Rng, thread_rng};
use rand::Rng;
use rand::seq::{IndexedRandom, SliceRandom};
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
use utils::shard::TenantShardId;
@@ -72,7 +72,7 @@ impl ChaosInjector {
let cron_interval = self.get_cron_interval_sleep_future();
let chaos_type = tokio::select! {
_ = interval.tick() => {
if thread_rng().gen_bool(0.5) {
if rand::rng().random_bool(0.5) {
ChaosEvent::MigrationsToSecondary
} else {
ChaosEvent::GracefulMigrationsAnywhere
@@ -134,7 +134,7 @@ impl ChaosInjector {
let Some(new_location) = shard
.intent
.get_secondary()
.choose(&mut thread_rng())
.choose(&mut rand::rng())
.cloned()
else {
tracing::info!(
@@ -190,7 +190,7 @@ impl ChaosInjector {
// Pick our victims: use a hand-rolled loop rather than choose_multiple() because we want
// to take the mutable refs from our candidates rather than ref'ing them.
while !candidates.is_empty() && victims.len() < batch_size {
let i = thread_rng().gen_range(0..candidates.len());
let i = rand::rng().random_range(0..candidates.len());
victims.push(candidates.swap_remove(i));
}
@@ -210,7 +210,7 @@ impl ChaosInjector {
})
.collect::<Vec<_>>();
let Some(victim_node) = candidate_nodes.choose(&mut thread_rng()) else {
let Some(victim_node) = candidate_nodes.choose(&mut rand::rng()) else {
// This can happen if e.g. we are in a small region with only one pageserver per AZ.
tracing::info!(
"no candidate nodes found for migrating shard {tenant_shard_id} within its home AZ",
@@ -264,7 +264,7 @@ impl ChaosInjector {
out_of_home_az.len()
);
out_of_home_az.shuffle(&mut thread_rng());
out_of_home_az.shuffle(&mut rand::rng());
victims.extend(out_of_home_az.into_iter().take(batch_size));
} else {
tracing::info!(
@@ -274,7 +274,7 @@ impl ChaosInjector {
);
victims.extend(out_of_home_az);
in_home_az.shuffle(&mut thread_rng());
in_home_az.shuffle(&mut rand::rng());
victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
}

View File

@@ -364,7 +364,12 @@ impl SafekeeperReconcilerInner {
http_hosts,
tenant_id: req.tenant_id,
timeline_id,
ignore_tombstone: Some(false),
// TODO(diko): get mconf from "timelines" table and pass it here.
// Now we use pull_timeline reconciliation only for the timeline creation,
// so it's not critical right now.
// It could be fixed together with other reconciliation issues:
// https://github.com/neondatabase/neon/issues/12189
mconf: None,
};
success = self
.reconcile_inner(

View File

@@ -10,6 +10,7 @@ use crate::id_lock_map::trace_shared_lock;
use crate::metrics;
use crate::persistence::{
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
TimelineUpdate,
};
use crate::safekeeper::Safekeeper;
use crate::safekeeper_client::SafekeeperClient;
@@ -311,6 +312,7 @@ impl Service {
new_sk_set: None,
cplane_notified_generation: 0,
deleted_at: None,
sk_set_notified_generation: 0,
};
let inserted = self
.persistence
@@ -454,19 +456,34 @@ impl Service {
let persistence = TimelinePersistence {
tenant_id: req.tenant_id.to_string(),
timeline_id: req.timeline_id.to_string(),
start_lsn: Lsn::INVALID.into(),
start_lsn: req.start_lsn.into(),
generation: 1,
sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
new_sk_set: None,
cplane_notified_generation: 1,
deleted_at: None,
sk_set_notified_generation: 1,
};
let inserted = self.persistence.insert_timeline(persistence).await?;
let inserted = self
.persistence
.insert_timeline(persistence.clone())
.await?;
if inserted {
tracing::info!("imported timeline into db");
} else {
tracing::info!("didn't import timeline into db, as it is already present in db");
return Ok(());
}
tracing::info!("timeline already present in db, updating");
let update = TimelineUpdate {
tenant_id: persistence.tenant_id,
timeline_id: persistence.timeline_id,
start_lsn: persistence.start_lsn,
sk_set: persistence.sk_set,
new_sk_set: persistence.new_sk_set,
};
self.persistence.update_timeline_unsafe(update).await?;
tracing::info!("timeline updated");
Ok(())
}
@@ -879,17 +896,21 @@ impl Service {
/// If min_position is not None, validates that majority of safekeepers
/// reached at least min_position.
///
/// If update_notified_generation is set, also updates sk_set_notified_generation
/// in the timelines table.
///
/// Return responses from safekeepers in the input order.
async fn tenant_timeline_set_membership_quorum(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: &[Safekeeper],
config: &membership::Configuration,
mconf: &membership::Configuration,
min_position: Option<(Term, Lsn)>,
update_notified_generation: bool,
) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
let req = TimelineMembershipSwitchRequest {
mconf: config.clone(),
mconf: mconf.clone(),
};
const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -930,28 +951,34 @@ impl Service {
.await?;
for res in results.iter().flatten() {
if res.current_conf.generation > config.generation {
if res.current_conf.generation > mconf.generation {
// Antoher switch_membership raced us.
return Err(ApiError::Conflict(format!(
"received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation, config.generation
res.current_conf.generation, mconf.generation
)));
} else if res.current_conf.generation < config.generation {
} else if res.current_conf.generation < mconf.generation {
// Note: should never happen.
// If we get a response, it should be at least the sent generation.
tracing::error!(
"received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation,
config.generation
mconf.generation
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation,
config.generation
mconf.generation
)));
}
}
if update_notified_generation {
self.persistence
.update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation)
.await?;
}
Ok(results)
}
@@ -964,6 +991,7 @@ impl Service {
timeline_id: TimelineId,
to_safekeepers: &[Safekeeper],
from_safekeepers: &[Safekeeper],
mconf: membership::Configuration,
) -> Result<(), ApiError> {
let http_hosts = from_safekeepers
.iter()
@@ -982,14 +1010,11 @@ impl Service {
.collect::<Vec<_>>()
);
// TODO(diko): need to pass mconf/generation with the request
// to properly handle tombstones. Ignore tombstones for now.
// Worst case: we leave a timeline on a safekeeper which is not in the current set.
let req = PullTimelineRequest {
tenant_id,
timeline_id,
http_hosts,
ignore_tombstone: Some(true),
mconf: Some(mconf),
};
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -1020,17 +1045,22 @@ impl Service {
}
/// Exclude a timeline from safekeepers in parallel with retries.
/// If an exclude request is unsuccessful, it will be added to
/// the reconciler, and after that the function will succeed.
async fn tenant_timeline_safekeeper_exclude(
///
/// Assumes that the exclude requests are already persistent in the database.
///
/// The function does best effort: if an exclude request is unsuccessful,
/// it will be added to the in-memory reconciler, and the function will succeed anyway.
///
/// Might fail if there is error accessing the database.
async fn tenant_timeline_safekeeper_exclude_reconcile(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: &[Safekeeper],
config: &membership::Configuration,
mconf: &membership::Configuration,
) -> Result<(), ApiError> {
let req = TimelineMembershipSwitchRequest {
mconf: config.clone(),
mconf: mconf.clone(),
};
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -1048,25 +1078,32 @@ impl Service {
let mut reconcile_requests = Vec::new();
for (idx, res) in results.iter().enumerate() {
if res.is_err() {
let sk_id = safekeepers[idx].skp.id;
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: config.generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
sk_id,
};
tracing::info!("writing pending exclude op for sk id {sk_id}");
self.persistence.insert_pending_op(pending_op).await?;
fail::fail_point!("sk-migration-step-9-mid-exclude", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-mid-exclude"
)))
});
for (idx, res) in results.iter().enumerate() {
let sk_id = safekeepers[idx].skp.id;
let generation = mconf.generation.into_inner();
if res.is_ok() {
self.persistence
.remove_pending_op(
tenant_id,
Some(timeline_id),
NodeId(sk_id as u64),
generation,
)
.await?;
} else {
let req = ScheduleRequest {
safekeeper: Box::new(safekeepers[idx].clone()),
host_list: Vec::new(),
tenant_id,
timeline_id: Some(timeline_id),
generation: config.generation.into_inner(),
generation,
kind: SafekeeperTimelineOpKind::Exclude,
};
reconcile_requests.push(req);
@@ -1193,6 +1230,22 @@ impl Service {
}
// It it is the same new_sk_set, we can continue the migration (retry).
} else {
let prev_finished = timeline.cplane_notified_generation == timeline.generation
&& timeline.sk_set_notified_generation == timeline.generation;
if !prev_finished {
// The previous migration is committed, but the finish step failed.
// Safekeepers/cplane might not know about the last membership configuration.
// Retry the finish step to ensure smooth migration.
self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
.await?;
}
if cur_sk_set == new_sk_set {
tracing::info!("timeline is already at the desired safekeeper set");
return Ok(());
}
// 3. No active migration yet.
// Increment current generation and put desired_set to new_sk_set.
generation = generation.next();
@@ -1204,8 +1257,15 @@ impl Service {
generation,
&cur_sk_set,
Some(&new_sk_set),
&[],
)
.await?;
fail::fail_point!("sk-migration-after-step-3", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-3"
)))
});
}
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
@@ -1234,6 +1294,7 @@ impl Service {
&cur_safekeepers,
&joint_config,
None, // no min position
true, // update notified generation
)
.await?;
@@ -1251,6 +1312,12 @@ impl Service {
"safekeepers set membership updated",
);
fail::fail_point!("sk-migration-after-step-4", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-4"
)))
});
// 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
// by doing pull_timeline from the majority of the current set.
@@ -1267,9 +1334,16 @@ impl Service {
timeline_id,
&pull_to_safekeepers,
&cur_safekeepers,
joint_config.clone(),
)
.await?;
fail::fail_point!("sk-migration-after-step-5", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-5"
)))
});
// 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
// TODO(diko): do we need to bump timeline term?
@@ -1285,9 +1359,16 @@ impl Service {
&new_safekeepers,
&joint_config,
Some(sync_position),
false, // we're just waiting for sync position, don't update notified generation
)
.await?;
fail::fail_point!("sk-migration-after-step-7", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-7"
)))
});
// 8. Create new_conf: Configuration incrementing joint_conf generation and
// having new safekeeper set as sk_set and None new_sk_set.
@@ -1299,45 +1380,55 @@ impl Service {
new_members: None,
};
self.persistence
.update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
.await?;
// TODO(diko): at this point we have already updated the timeline in the database,
// but we still need to notify safekeepers and cplane about the new configuration,
// and put delition of the timeline from the old safekeepers into the reconciler.
// Ideally it should be done atomically, but now it's not.
// Worst case: the timeline is not deleted from old safekeepers,
// the compute may require both quorums till the migration is retried and completed.
self.tenant_timeline_set_membership_quorum(
tenant_id,
timeline_id,
&new_safekeepers,
&new_conf,
None, // no min position
)
.await?;
let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
let exclude_safekeepers = cur_safekeepers
.into_iter()
.filter(|sk| !new_ids.contains(&sk.get_id()))
.collect::<Vec<_>>();
self.tenant_timeline_safekeeper_exclude(
let exclude_requests = exclude_safekeepers
.iter()
.map(|sk| TimelinePendingOpPersistence {
sk_id: sk.skp.id,
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
})
.collect::<Vec<_>>();
self.persistence
.update_timeline_membership(
tenant_id,
timeline_id,
generation,
&new_sk_set,
None,
&exclude_requests,
)
.await?;
fail::fail_point!("sk-migration-after-step-8", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-8"
)))
});
// At this point we have already updated the timeline in the database, so the final
// membership configuration is commited and the migration is not abortable anymore.
// But safekeepers and cplane/compute still need to be notified about the new configuration.
// The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about
// the new configuration and reconciles excluded safekeepers.
// If it fails, the safkeeper migration call should be retried.
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&exclude_safekeepers,
&new_safekeepers,
&new_conf,
&exclude_safekeepers,
)
.await?;
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
.await?;
Ok(())
}
@@ -1381,6 +1472,130 @@ impl Service {
ApiError::InternalServerError(anyhow::anyhow!(
"failed to notify cplane about safekeeper membership change: {err}"
))
})
})?;
self.persistence
.update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation)
.await?;
Ok(())
}
/// Finish safekeeper migration.
///
/// It is the last step of the safekeeper migration.
///
/// Notifies safekeepers and cplane about the final membership configuration,
/// reconciles excluded safekeepers and updates *_notified_generation in the database.
async fn finish_safekeeper_migration(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
new_safekeepers: &[Safekeeper],
new_conf: &membership::Configuration,
exclude_safekeepers: &[Safekeeper],
) -> Result<(), ApiError> {
// 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf.
// Also try to exclude safekeepers and notify cplane about the membership change.
self.tenant_timeline_set_membership_quorum(
tenant_id,
timeline_id,
new_safekeepers,
new_conf,
None, // no min position
true, // update notified generation
)
.await?;
fail::fail_point!("sk-migration-step-9-after-set-membership", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-after-set-membership"
)))
});
self.tenant_timeline_safekeeper_exclude_reconcile(
tenant_id,
timeline_id,
exclude_safekeepers,
new_conf,
)
.await?;
fail::fail_point!("sk-migration-step-9-after-exclude", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-after-exclude"
)))
});
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf)
.await?;
fail::fail_point!("sk-migration-after-step-9", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-9"
)))
});
Ok(())
}
/// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database.
/// It's used when the migration failed during the finish step and we need to retry it.
async fn finish_safekeeper_migration_retry(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline: &TimelinePersistence,
) -> Result<(), ApiError> {
if timeline.new_sk_set.is_some() {
// Logical error, should never happen.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None"
)));
}
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
let cur_sk_member_set =
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
let mconf = membership::Configuration {
generation: SafekeeperGeneration::new(timeline.generation as u32),
members: cur_sk_member_set,
new_members: None,
};
// We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler.
// Reload them from the database.
let pending_ops = self
.persistence
.list_pending_ops_for_timeline(tenant_id, timeline_id)
.await?;
let mut exclude_sk_ids = Vec::new();
for op in pending_ops {
if op.op_kind == SafekeeperTimelineOpKind::Exclude
&& op.generation == timeline.generation
{
exclude_sk_ids.push(op.sk_id);
}
}
let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?;
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&cur_safekeepers,
&mconf,
&exclude_safekeepers,
)
.await?;
Ok(())
}
}