From 5bcae3a86e52b806f48e1c747353ad9cb7fb06d1 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 13 Feb 2024 12:23:38 +0300 Subject: [PATCH] Drop LR slots if too many .snap files are found. PR #6655 turned out to be not enough to prevent .snap files bloat; some subscribers just don't ack flushed position, thus never advancing the slot. Probably other bloating scenarios are also possible, so add a more direct restriction -- drop all slots if too many .snap files has been discovered. --- pgxn/neon/neon.c | 226 +++++++++++++----- .../regress/test_logical_replication.py | 48 +++- 2 files changed, 213 insertions(+), 61 deletions(-) diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 799f88751c..24ec909c79 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -37,7 +37,7 @@ PG_MODULE_MAGIC; void _PG_init(void); -static int logical_replication_max_time_lag = 3600; +static int logical_replication_max_snap_files = 300; static void InitLogicalReplicationMonitor(void) @@ -45,14 +45,14 @@ InitLogicalReplicationMonitor(void) BackgroundWorker bgw; DefineCustomIntVariable( - "neon.logical_replication_max_time_lag", - "Threshold for dropping unused logical replication slots", - NULL, - &logical_replication_max_time_lag, - 3600, 0, INT_MAX, - PGC_SIGHUP, - GUC_UNIT_S, - NULL, NULL, NULL); + "neon.logical_replication_max_snap_files", + "Maximum allowed logical replication .snap files", + NULL, + &logical_replication_max_snap_files, + 300, 0, INT_MAX, + PGC_SIGHUP, + 0, + NULL, NULL, NULL); memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; @@ -68,22 +68,99 @@ InitLogicalReplicationMonitor(void) RegisterBackgroundWorker(&bgw); } -typedef struct +static int +LsnDescComparator(const void *a, const void *b) { - NameData name; - bool dropped; - XLogRecPtr confirmed_flush_lsn; - TimestampTz last_updated; -} SlotStatus; + XLogRecPtr lsn1 = *((const XLogRecPtr *) a); + XLogRecPtr lsn2 = *((const XLogRecPtr *) b); + + if (lsn1 < lsn2) + return 1; + else if (lsn1 == lsn2) + return 0; + else + return -1; +} + +/* + * Look at .snap files and calculate minimum allowed restart_lsn of slot so that + * next gc would leave not more than logical_replication_max_snap_files; all + * slots having lower restart_lsn should be dropped. + */ +static XLogRecPtr +get_num_snap_files_lsn_threshold(void) +{ + DIR *dirdesc; + struct dirent *de; + char *snap_path = "pg_logical/snapshots/"; + int cnt = 0; + int lsns_allocated = 1024; + int lsns_num = 0; + XLogRecPtr *lsns; + XLogRecPtr cutoff; + + if (logical_replication_max_snap_files < 0) + return 0; + + lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated); + + /* find all .snap files and get their lsns */ + dirdesc = AllocateDir(snap_path); + while ((de = ReadDir(dirdesc, snap_path)) != NULL) + { + XLogRecPtr lsn; + uint32 hi; + uint32 lo; + + if (strcmp(de->d_name, ".") == 0 || + strcmp(de->d_name, "..") == 0) + continue; + + if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2) + { + ereport(LOG, + (errmsg("could not parse file name as .snap file \"%s\"", de->d_name))); + continue; + } + + lsn = ((uint64) hi) << 32 | lo; + elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn)); + if (lsns_allocated == lsns_num) + { + lsns_allocated *= 2; + lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated); + } + lsns[lsns_num++] = lsn; + } + /* sort by lsn desc */ + qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator); + /* and take cutoff at logical_replication_max_snap_files */ + if (logical_replication_max_snap_files > lsns_num) + cutoff = 0; + /* have less files than cutoff */ + else + { + cutoff = lsns[logical_replication_max_snap_files - 1]; + elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d", + LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files); + } + pfree(lsns); + FreeDir(dirdesc); + return cutoff; +} + +#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */ /* * Unused logical replication slots pins WAL and prevents deletion of snapshots. + * WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which + * need too many .snap files. */ PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg) { - SlotStatus* slots; - TimestampTz now, last_checked; + TimestampTz now, + last_checked; /* Establish signal handlers. */ pqsignal(SIGUSR1, procsignal_sigusr1_handler); @@ -92,72 +169,101 @@ LogicalSlotsMonitorMain(Datum main_arg) BackgroundWorkerUnblockSignals(); - slots = (SlotStatus*)calloc(max_replication_slots, sizeof(SlotStatus)); - last_checked = GetCurrentTimestamp(); - for (;;) { - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, - logical_replication_max_time_lag*1000/2, - PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); + XLogRecPtr cutoff_lsn; - now = GetCurrentTimestamp(); - - if (now - last_checked > logical_replication_max_time_lag*USECS_PER_SEC) + /* + * If there are too many .snap files, just drop all logical slots to + * prevent aux files bloat. + */ + cutoff_lsn = get_num_snap_files_lsn_threshold(); + if (cutoff_lsn > 0) { - int n_active_slots = 0; - last_checked = now; - - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (int i = 0; i < max_replication_slots; i++) { + char slot_name[NAMEDATALEN]; ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + XLogRecPtr restart_lsn; + /* find the name */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); /* Consider only logical repliction slots */ if (!s->in_use || !SlotIsLogical(s)) - continue; - - if (s->active_pid != 0) { - n_active_slots += 1; + LWLockRelease(ReplicationSlotControlLock); continue; } - /* Check if there was some activity with the slot since last check */ - if (s->data.confirmed_flush != slots[i].confirmed_flush_lsn) + /* do we need to drop it? */ + SpinLockAcquire(&s->mutex); + restart_lsn = s->data.restart_lsn; + SpinLockRelease(&s->mutex); + if (restart_lsn >= cutoff_lsn) { - slots[i].confirmed_flush_lsn = s->data.confirmed_flush; - slots[i].last_updated = now; + LWLockRelease(ReplicationSlotControlLock); + continue; } - else if (now - slots[i].last_updated > logical_replication_max_time_lag*USECS_PER_SEC) - { - slots[i].name = s->data.name; - slots[i].dropped = true; - } - } - LWLockRelease(ReplicationSlotControlLock); - /* - * If there are no active subscriptions, then no new snapshots are generated - * and so no need to force slot deletion. - */ - if (n_active_slots != 0) - { - for (int i = 0; i < max_replication_slots; i++) + strlcpy(slot_name, s->data.name.data, NAMEDATALEN); + elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X", + slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn)); + LWLockRelease(ReplicationSlotControlLock); + + /* now try to drop it, killing owner before if any */ + for (;;) { - if (slots[i].dropped) + pid_t active_pid; + + SpinLockAcquire(&s->mutex); + active_pid = s->active_pid; + SpinLockRelease(&s->mutex); + + if (active_pid == 0) { - elog(LOG, "Drop logical replication slot because it was not update more than %ld seconds", - (now - slots[i].last_updated)/USECS_PER_SEC); - ReplicationSlotDrop(slots[i].name.data, true); - slots[i].dropped = false; + /* + * Slot is releasted, try to drop it. Though of course + * it could have been reacquired, so drop can ERROR + * out. Similarly it could have been dropped in the + * meanwhile. + * + * In principle we could remove pg_try/pg_catch, that + * would restart the whole bgworker. + */ + ConditionVariableCancelSleep(); + PG_TRY(); + { + ReplicationSlotDrop(slot_name, true); + elog(LOG, "ls_monitor: slot %s dropped", slot_name); + } + PG_CATCH(); + { + /* log ERROR and reset elog stack */ + EmitErrorReport(); + FlushErrorState(); + elog(LOG, "ls_monitor: failed to drop slot %s", slot_name); + } + PG_END_TRY(); + break; + } + else + { + /* kill the owner and wait for release */ + elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid); + (void) kill(active_pid, SIGTERM); + /* We shouldn't get stuck, but to be safe add timeout. */ + ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP); } } } } + + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, + LS_MONITOR_CHECK_INTERVAL, + PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); } } diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index eff0b124d3..3f4ca8070d 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -1,4 +1,5 @@ import time +from functools import partial from random import choice from string import ascii_lowercase @@ -10,7 +11,7 @@ from fixtures.neon_fixtures import ( wait_for_last_flush_lsn, ) from fixtures.types import Lsn -from fixtures.utils import query_scalar +from fixtures.utils import query_scalar, wait_until def random_string(n: int): @@ -157,6 +158,51 @@ COMMIT; assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1 +# Test that neon.logical_replication_max_snap_files works +def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg): + def slot_removed(ep): + assert ( + endpoint.safe_psql( + "select count(*) from pg_replication_slots where slot_name = 'stale_slot'" + )[0][0] + == 0 + ) + + env = neon_simple_env + + env.neon_cli.create_branch("test_logical_replication", "empty") + # set low neon.logical_replication_max_snap_files + endpoint = env.endpoints.create_start( + "test_logical_replication", + config_lines=["log_statement=all", "neon.logical_replication_max_snap_files=1"], + ) + + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # create obsolete slot + cur.execute("select pg_create_logical_replication_slot('stale_slot', 'pgoutput');") + assert ( + endpoint.safe_psql( + "select count(*) from pg_replication_slots where slot_name = 'stale_slot'" + )[0][0] + == 1 + ) + + # now insert some data and create and start live subscriber to create more .snap files + # (in most cases this is not needed as stale_slot snap will have higher LSN than restart_lsn anyway) + cur.execute("create table t(pk integer primary key, payload integer)") + cur.execute("create publication pub1 for table t") + + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") + connstr = endpoint.connstr().replace("'", "''") + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint)) + + # Test compute start at LSN page of which starts with contrecord # https://github.com/neondatabase/neon/issues/5749 def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):