Additional fixes and improvements to storcon safekeeper timelines (#11477)

This delivers some additional fixes and improvements to storcon managed
safekeeper timelines:

* use `i32::MAX` for the generation number of timeline deletion
* start the generation for new timelines at 1 instead of 0: this ensures
that the other components actually are generation enabled
* fix database operations we use for metrics
* use join in list_pending_ops to prevent the classical ORM issue where
one does many db queries
* use enums in `test_storcon_create_delete_sk_down`. we are adding a
second parameter, and having two bool parameters is weird.
* extend `test_storcon_create_delete_sk_down` with a test of whole
tenant deletion. this hasn't been tested before.
* remove some redundant logging contexts
* Don't require mutable access to the service lock for scheduling
pending ops in memory. In order to pull this off, create reconcilers
eagerly. The advantage is that we don't need mutable access to the
service lock that way any more.

Part of #9011

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
Arpad Müller
2025-04-17 22:25:30 +02:00
committed by GitHub
parent 6c2e5c044c
commit c1e4befd56
5 changed files with 139 additions and 69 deletions

View File

@@ -126,6 +126,7 @@ pub(crate) enum DatabaseOperation {
InsertTimelineReconcile,
RemoveTimelineReconcile,
ListTimelineReconcile,
ListTimelineReconcileStartup,
}
#[must_use]
@@ -1521,23 +1522,41 @@ impl Persistence {
.await
}
/// Load pending operations from db.
pub(crate) async fn list_pending_ops(
/// Load pending operations from db, joined together with timeline data.
pub(crate) async fn list_pending_ops_with_timelines(
&self,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
) -> DatabaseResult<Vec<(TimelinePendingOpPersistence, Option<TimelinePersistence>)>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
use crate::schema::timelines;
let timeline_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
Ok(from_db)
})
})
.with_measured_conn(
DatabaseOperation::ListTimelineReconcileStartup,
move |conn| {
Box::pin(async move {
let from_db: Vec<(TimelinePendingOpPersistence, Option<TimelineFromDb>)> =
dsl::safekeeper_timeline_pending_ops
.left_join(
timelines::table.on(timelines::tenant_id
.eq(dsl::tenant_id)
.and(timelines::timeline_id.eq(dsl::timeline_id))),
)
.select((
TimelinePendingOpPersistence::as_select(),
Option::<TimelineFromDb>::as_select(),
))
.load(conn)
.await?;
Ok(from_db)
})
},
)
.await?;
Ok(timeline_from_db)
Ok(timeline_from_db
.into_iter()
.map(|(op, tl_opt)| (op, tl_opt.map(|tl_opt| tl_opt.into_persistence())))
.collect())
}
/// List pending operations for a given timeline (including tenant-global ones)
pub(crate) async fn list_pending_ops_for_timeline(
@@ -1580,7 +1599,7 @@ impl Persistence {
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| {
let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default();
Box::pin(async move {
diesel::delete(dsl::safekeeper_timeline_pending_ops)

View File

@@ -824,9 +824,13 @@ impl Service {
let mut locked = self.inner.write().unwrap();
locked.become_leader();
for (sk_id, _sk) in locked.safekeepers.clone().iter() {
locked.safekeeper_reconcilers.start_reconciler(*sk_id, self);
}
locked
.safekeeper_reconcilers
.schedule_request_vec(self, sk_schedule_requests);
.schedule_request_vec(sk_schedule_requests);
}
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that

View File

@@ -30,31 +30,35 @@ impl SafekeeperReconcilers {
reconcilers: HashMap::new(),
}
}
pub(crate) fn schedule_request_vec(
&mut self,
service: &Arc<Service>,
reqs: Vec<ScheduleRequest>,
) {
/// Adds a safekeeper-specific reconciler.
/// Can be called multiple times, but it needs to be called at least once
/// for every new safekeeper added.
pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc<Service>) {
self.reconcilers.entry(node_id).or_insert_with(|| {
SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
});
}
/// Stop a safekeeper-specific reconciler.
/// Stops the reconciler, cancelling all ongoing tasks.
pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) {
if let Some(handle) = self.reconcilers.remove(&node_id) {
handle.cancel.cancel();
}
}
pub(crate) fn schedule_request_vec(&self, reqs: Vec<ScheduleRequest>) {
tracing::info!(
"Scheduling {} pending safekeeper ops loaded from db",
reqs.len()
);
for req in reqs {
self.schedule_request(service, req);
self.schedule_request(req);
}
}
pub(crate) fn schedule_request(&mut self, service: &Arc<Service>, req: ScheduleRequest) {
pub(crate) fn schedule_request(&self, req: ScheduleRequest) {
let node_id = req.safekeeper.get_id();
let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| {
SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
});
let reconciler_handle = self.reconcilers.get(&node_id).unwrap();
reconciler_handle.schedule_reconcile(req);
}
pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) {
if let Some(handle) = self.reconcilers.remove(&node_id) {
handle.cancel.cancel();
}
}
/// Cancel ongoing reconciles for the given timeline
///
/// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
@@ -78,9 +82,12 @@ pub(crate) async fn load_schedule_requests(
service: &Arc<Service>,
safekeepers: &HashMap<NodeId, Safekeeper>,
) -> anyhow::Result<Vec<ScheduleRequest>> {
let pending_ops = service.persistence.list_pending_ops().await?;
let mut res = Vec::with_capacity(pending_ops.len());
for op_persist in pending_ops {
let pending_ops_timelines = service
.persistence
.list_pending_ops_with_timelines()
.await?;
let mut res = Vec::with_capacity(pending_ops_timelines.len());
for (op_persist, timeline_persist) in pending_ops_timelines {
let node_id = NodeId(op_persist.sk_id as u64);
let Some(sk) = safekeepers.get(&node_id) else {
// This shouldn't happen, at least the safekeeper should exist as decomissioned.
@@ -102,16 +109,12 @@ pub(crate) async fn load_schedule_requests(
SafekeeperTimelineOpKind::Delete => Vec::new(),
SafekeeperTimelineOpKind::Exclude => Vec::new(),
SafekeeperTimelineOpKind::Pull => {
// TODO this code is super hacky, it doesn't take migrations into account
let Some(timeline_id) = timeline_id else {
if timeline_id.is_none() {
// We only do this extra check (outside of timeline_persist check) to give better error msgs
anyhow::bail!(
"timeline_id is empty for `pull` schedule request for {tenant_id}"
);
};
let timeline_persist = service
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(timeline_persist) = timeline_persist else {
// This shouldn't happen, the timeline should still exist
tracing::warn!(
@@ -163,6 +166,7 @@ pub(crate) struct ScheduleRequest {
pub(crate) kind: SafekeeperTimelineOpKind,
}
/// Handle to per safekeeper reconciler.
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
@@ -170,7 +174,10 @@ struct ReconcilerHandle {
}
impl ReconcilerHandle {
/// Obtain a new token slot, cancelling any existing reconciliations for that timeline
/// Obtain a new token slot, cancelling any existing reconciliations for
/// that timeline. It is not useful to have >1 operation per <tenant_id,
/// timeline_id, safekeeper>, hence scheduling op cancels current one if it
/// exists.
fn new_token_slot(
&self,
tenant_id: TenantId,
@@ -305,15 +312,16 @@ impl SafekeeperReconciler {
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
if let Some(timeline_id) = req.timeline_id {
let deleted = self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}");
},
req_cancel,
)
.await;
let deleted = self
.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
if deleted {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
@@ -344,12 +352,13 @@ impl SafekeeperReconciler {
{
Ok(list) => {
if !list.is_empty() {
tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
// duplicate the timeline_id here because it might be None in the reconcile context
tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
return;
}
}
Err(e) => {
tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}");
tracing::warn!(%timeline_id, "couldn't query pending ops: {e}");
return;
}
}

View File

@@ -46,6 +46,7 @@ impl Service {
.map(SecretString::from);
let mut joinset = JoinSet::new();
// Prepare membership::Configuration from choosen safekeepers.
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
@@ -205,7 +206,7 @@ impl Service {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
start_lsn: start_lsn.into(),
generation: 0,
generation: 1,
sk_set: sks_persistence.clone(),
new_sk_set: None,
cplane_notified_generation: 0,
@@ -254,7 +255,7 @@ impl Service {
self.persistence.insert_pending_op(pending_op).await?;
}
if !remaining.is_empty() {
let mut locked = self.inner.write().unwrap();
let locked = self.inner.read().unwrap();
for remaining_id in remaining {
let Some(sk) = locked.safekeepers.get(&remaining_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -290,7 +291,7 @@ impl Service {
generation: timeline_persist.generation as u32,
kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
locked.safekeeper_reconcilers.schedule_request(req);
}
}
@@ -357,7 +358,7 @@ impl Service {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: tl.generation,
generation: i32::MAX,
op_kind: SafekeeperTimelineOpKind::Delete,
sk_id: *sk_id,
};
@@ -365,7 +366,7 @@ impl Service {
self.persistence.insert_pending_op(pending_op).await?;
}
{
let mut locked = self.inner.write().unwrap();
let locked = self.inner.read().unwrap();
for sk_id in all_sks {
let sk_id = NodeId(*sk_id as u64);
let Some(sk) = locked.safekeepers.get(&sk_id) else {
@@ -383,7 +384,7 @@ impl Service {
generation: tl.generation as u32,
kind: SafekeeperTimelineOpKind::Delete,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
locked.safekeeper_reconcilers.schedule_request(req);
}
}
Ok(())
@@ -482,7 +483,7 @@ impl Service {
tenant_id,
timeline_id: None,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
locked.safekeeper_reconcilers.schedule_request(req);
}
Ok(())
}
@@ -579,7 +580,7 @@ impl Service {
}
pub(crate) async fn upsert_safekeeper(
&self,
self: &Arc<Service>,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), ApiError> {
let node_id = NodeId(record.id as u64);
@@ -618,6 +619,9 @@ impl Service {
);
}
}
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
locked.safekeepers = Arc::new(safekeepers);
metrics::METRICS_REGISTRY
.metrics_group
@@ -638,7 +642,7 @@ impl Service {
}
pub(crate) async fn set_safekeeper_scheduling_policy(
&self,
self: &Arc<Service>,
id: i64,
scheduling_policy: SkSchedulingPolicy,
) -> Result<(), DatabaseError> {
@@ -656,9 +660,13 @@ impl Service {
sk.set_scheduling_policy(scheduling_policy);
match scheduling_policy {
SkSchedulingPolicy::Active => (),
SkSchedulingPolicy::Active => {
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
}
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
locked.safekeeper_reconcilers.stop_reconciler(node_id);
}
}

View File

@@ -4114,13 +4114,29 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
assert reconciles_after_restart == 0
class RestartStorcon(Enum):
RESTART = "restart"
ONLINE = "online"
class DeletionSubject(Enum):
TIMELINE = "timeline"
TENANT = "tenant"
@run_only_on_default_postgres("PG version is not interesting here")
@pytest.mark.parametrize("restart_storcon", [True, False])
def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool):
@pytest.mark.parametrize("restart_storcon", [RestartStorcon.RESTART, RestartStorcon.ONLINE])
@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE])
def test_storcon_create_delete_sk_down(
neon_env_builder: NeonEnvBuilder,
restart_storcon: RestartStorcon,
deletetion_subject: DeletionSubject,
):
"""
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
- restart_storcon: tests whether the pending ops are persisted.
- restart_storcon: tests that the pending ops are persisted.
if we don't restart, we test that we don't require it to come from the db.
- deletion_subject: test that both single timeline and whole tenant deletion work.
"""
neon_env_builder.num_safekeepers = 3
@@ -4143,6 +4159,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
child_timeline_id = env.create_branch("child_of_main", tenant_id)
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
@@ -4155,7 +4172,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
]
)
if restart_storcon:
if restart_storcon == RestartStorcon.RESTART:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
@@ -4168,6 +4185,13 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
env.storage_controller.assert_log_contains("writing pending op for sk id 1")
env.safekeepers[0].start()
@@ -4176,25 +4200,31 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
)
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{child_timeline_id} from safekeeper"
)
wait_until(logged_contains_on_sk)
env.safekeepers[1].stop()
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
if deletetion_subject == DeletionSubject.TENANT:
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
else:
env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id)
# ensure the safekeeper deleted the timeline
def timeline_deleted_on_active_sks():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
env.safekeepers[2].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_active_sks)
if restart_storcon:
if restart_storcon == RestartStorcon.RESTART:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
@@ -4204,7 +4234,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)