Add new activating scheduling policy for safekeepers (#12441)

When deploying new safekeepers, we don't immediately want to send
traffic to them. Maybe they are not ready yet by the time the deploy
script is registering them with the storage controller.

For pageservers, the storcon solves the problem by not scheduling stuff
to them unless there has been a positive heartbeat response. We can't do
the same for safekeepers though, otherwise a single down safekeeper
would mean we can't create new timelines in smaller regions where there
is only three safekeepers in total.

So far we have created safekeepers as `pause` but this adds a manual
step to safekeeper deployment which is prone to oversight. We want
things to be automatted. So we introduce a new state `activating` that
acts just like `pause`, except that we automatically transition the
policy to `active` once we get a positive heartbeat from the safekeeper.
For `pause`, we always keep the safekeeper paused.
This commit is contained in:
Arpad Müller
2025-07-03 18:27:43 +02:00
committed by GitHub
parent b96983a31c
commit a852bc5e39
7 changed files with 187 additions and 44 deletions

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'pause';

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'activating';

View File

@@ -1388,6 +1388,48 @@ impl Persistence {
.await
}
/// Activate the given safekeeper, ensuring that there is no TOCTOU.
/// Returns `Some` if the safekeeper has indeed been activating (or already active). Other states return `None`.
pub(crate) async fn activate_safekeeper(&self, id_: i64) -> Result<Option<()>, DatabaseError> {
use crate::schema::safekeepers::dsl::*;
self.with_conn(move |conn| {
Box::pin(async move {
#[derive(Insertable, AsChangeset)]
#[diesel(table_name = crate::schema::safekeepers)]
struct UpdateSkSchedulingPolicy<'a> {
id: i64,
scheduling_policy: &'a str,
}
let scheduling_policy_active = String::from(SkSchedulingPolicy::Active);
let scheduling_policy_activating = String::from(SkSchedulingPolicy::Activating);
let rows_affected = diesel::update(
safekeepers.filter(id.eq(id_)).filter(
scheduling_policy
.eq(scheduling_policy_activating)
.or(scheduling_policy.eq(&scheduling_policy_active)),
),
)
.set(scheduling_policy.eq(&scheduling_policy_active))
.execute(conn)
.await?;
if rows_affected == 0 {
return Ok(Some(()));
}
if rows_affected != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({rows_affected})",
)));
}
Ok(Some(()))
})
})
.await
}
/// Persist timeline. Returns if the timeline was newly inserted. If it wasn't, we haven't done any writes.
pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<bool> {
use crate::schema::timelines;

View File

@@ -31,8 +31,8 @@ use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use pageserver_api::models::{
@@ -1249,7 +1249,7 @@ impl Service {
}
/// Heartbeat all storage nodes once in a while.
#[instrument(skip_all)]
async fn spawn_heartbeat_driver(&self) {
async fn spawn_heartbeat_driver(self: &Arc<Self>) {
self.startup_complete.clone().wait().await;
let mut interval = tokio::time::interval(self.config.heartbeat_interval);
@@ -1376,18 +1376,51 @@ impl Service {
}
}
if let Ok(deltas) = res_sk {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {
let Some(sk) = safekeepers.get_mut(&id) else {
tracing::info!(
"Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}"
);
continue;
};
sk.set_availability(state);
let mut to_activate = Vec::new();
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {
let Some(sk) = safekeepers.get_mut(&id) else {
tracing::info!(
"Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}"
);
continue;
};
if sk.scheduling_policy() == SkSchedulingPolicy::Activating
&& let SafekeeperState::Available { .. } = state
{
to_activate.push(id);
}
sk.set_availability(state);
}
locked.safekeepers = Arc::new(safekeepers);
}
for sk_id in to_activate {
// TODO this can race with set_scheduling_policy (can create disjoint DB <-> in-memory state)
tracing::info!("Activating safekeeper {sk_id}");
match self.persistence.activate_safekeeper(sk_id.0 as i64).await {
Ok(Some(())) => {}
Ok(None) => {
tracing::info!(
"safekeeper {sk_id} has been removed from db or has different scheduling policy than active or activating"
);
}
Err(e) => {
tracing::warn!("couldn't apply activation of {sk_id} to db: {e}");
continue;
}
}
if let Err(e) = self
.set_safekeeper_scheduling_policy_in_mem(sk_id, SkSchedulingPolicy::Active)
.await
{
tracing::info!("couldn't activate safekeeper {sk_id} in memory: {e}");
continue;
}
tracing::info!("Activation of safekeeper {sk_id} done");
}
locked.safekeepers = Arc::new(safekeepers);
}
}
}

View File

@@ -805,7 +805,7 @@ impl Service {
Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
SkSchedulingPolicy::Activating,
),
CancellationToken::new(),
use_https,
@@ -846,27 +846,36 @@ impl Service {
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.set_scheduling_policy(scheduling_policy);
self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy)
.await
}
match scheduling_policy {
SkSchedulingPolicy::Active => {
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
}
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
locked.safekeeper_reconcilers.stop_reconciler(node_id);
}
pub(crate) async fn set_safekeeper_scheduling_policy_in_mem(
self: &Arc<Service>,
node_id: NodeId,
scheduling_policy: SkSchedulingPolicy,
) -> Result<(), DatabaseError> {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.set_scheduling_policy(scheduling_policy);
match scheduling_policy {
SkSchedulingPolicy::Active => {
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
}
SkSchedulingPolicy::Decomissioned
| SkSchedulingPolicy::Pause
| SkSchedulingPolicy::Activating => {
locked.safekeeper_reconcilers.stop_reconciler(node_id);
}
locked.safekeepers = Arc::new(safekeepers);
}
locked.safekeepers = Arc::new(safekeepers);
Ok(())
}