From 43eae17f0d2e84b0c88e34f3fff6bfe515008b89 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 8 Feb 2024 17:31:15 +0200 Subject: [PATCH] Drop unused replication slots (#6655) ## Problem See #6626 If there is inactive replication slot then Postgres will not bw able to shrink WAL and delete unused snapshots. If she other active subscription is present, then snapshots created each 15 seconds will overflow AUX_DIR. Setting `max_slot_wal_keep_size` doesn't solve the problem, because even small WAL segment will be enough to overflow AUX_DIR if there is no other activity on the system. ## Summary of changes If there are active subscriptions and some logical replication slots are not used during `neon.logical_replication_max_time_lag` interval, then unused slot is dropped. ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist Co-authored-by: Konstantin Knizhnik --- pgxn/neon/neon.c | 133 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index b930fdb3ca..799f88751c 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -11,16 +11,23 @@ #include "postgres.h" #include "fmgr.h" +#include "miscadmin.h" #include "access/xact.h" #include "access/xlog.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "catalog/pg_type.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/slot.h" #include "replication/walsender.h" +#include "storage/procsignal.h" +#include "tcop/tcopprot.h" #include "funcapi.h" #include "access/htup_details.h" #include "utils/pg_lsn.h" #include "utils/guc.h" +#include "utils/wait_event.h" #include "neon.h" #include "walproposer.h" @@ -30,6 +37,130 @@ PG_MODULE_MAGIC; void _PG_init(void); +static int logical_replication_max_time_lag = 3600; + +static void +InitLogicalReplicationMonitor(void) +{ + BackgroundWorker bgw; + + DefineCustomIntVariable( + "neon.logical_replication_max_time_lag", + "Threshold for dropping unused logical replication slots", + NULL, + &logical_replication_max_time_lag, + 3600, 0, INT_MAX, + PGC_SIGHUP, + GUC_UNIT_S, + NULL, NULL, NULL); + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LogicalSlotsMonitorMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "Logical replication monitor"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "Logical replication monitor"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +typedef struct +{ + NameData name; + bool dropped; + XLogRecPtr confirmed_flush_lsn; + TimestampTz last_updated; +} SlotStatus; + +/* + * Unused logical replication slots pins WAL and prevents deletion of snapshots. + */ +PGDLLEXPORT void +LogicalSlotsMonitorMain(Datum main_arg) +{ + SlotStatus* slots; + TimestampTz now, last_checked; + + /* Establish signal handlers. */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + + BackgroundWorkerUnblockSignals(); + + slots = (SlotStatus*)calloc(max_replication_slots, sizeof(SlotStatus)); + last_checked = GetCurrentTimestamp(); + + for (;;) + { + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, + logical_replication_max_time_lag*1000/2, + PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + + now = GetCurrentTimestamp(); + + if (now - last_checked > logical_replication_max_time_lag*USECS_PER_SEC) + { + int n_active_slots = 0; + last_checked = now; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Consider only logical repliction slots */ + if (!s->in_use || !SlotIsLogical(s)) + continue; + + if (s->active_pid != 0) + { + n_active_slots += 1; + continue; + } + + /* Check if there was some activity with the slot since last check */ + if (s->data.confirmed_flush != slots[i].confirmed_flush_lsn) + { + slots[i].confirmed_flush_lsn = s->data.confirmed_flush; + slots[i].last_updated = now; + } + else if (now - slots[i].last_updated > logical_replication_max_time_lag*USECS_PER_SEC) + { + slots[i].name = s->data.name; + slots[i].dropped = true; + } + } + LWLockRelease(ReplicationSlotControlLock); + + /* + * If there are no active subscriptions, then no new snapshots are generated + * and so no need to force slot deletion. + */ + if (n_active_slots != 0) + { + for (int i = 0; i < max_replication_slots; i++) + { + if (slots[i].dropped) + { + elog(LOG, "Drop logical replication slot because it was not update more than %ld seconds", + (now - slots[i].last_updated)/USECS_PER_SEC); + ReplicationSlotDrop(slots[i].name.data, true); + slots[i].dropped = false; + } + } + } + } + } +} + void _PG_init(void) { @@ -44,6 +175,8 @@ _PG_init(void) pg_init_libpagestore(); pg_init_walproposer(); + InitLogicalReplicationMonitor(); + InitControlPlaneConnector(); pg_init_extension_server();