mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
Drop LR slots if too many .snap files are found.
PR #6655 turned out to be not enough to prevent .snap files bloat; some subscribers just don't ack flushed position, thus never advancing the slot. Probably other bloating scenarios are also possible, so add a more direct restriction -- drop all slots if too many .snap files has been discovered.
This commit is contained in:
226
pgxn/neon/neon.c
226
pgxn/neon/neon.c
@@ -37,7 +37,7 @@
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
|
||||
static int logical_replication_max_time_lag = 3600;
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
|
||||
static void
|
||||
InitLogicalReplicationMonitor(void)
|
||||
@@ -45,14 +45,14 @@ InitLogicalReplicationMonitor(void)
|
||||
BackgroundWorker bgw;
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.logical_replication_max_time_lag",
|
||||
"Threshold for dropping unused logical replication slots",
|
||||
NULL,
|
||||
&logical_replication_max_time_lag,
|
||||
3600, 0, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_S,
|
||||
NULL, NULL, NULL);
|
||||
"neon.logical_replication_max_snap_files",
|
||||
"Maximum allowed logical replication .snap files",
|
||||
NULL,
|
||||
&logical_replication_max_snap_files,
|
||||
300, 0, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
@@ -68,22 +68,99 @@ InitLogicalReplicationMonitor(void)
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
typedef struct
|
||||
static int
|
||||
LsnDescComparator(const void *a, const void *b)
|
||||
{
|
||||
NameData name;
|
||||
bool dropped;
|
||||
XLogRecPtr confirmed_flush_lsn;
|
||||
TimestampTz last_updated;
|
||||
} SlotStatus;
|
||||
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
||||
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
||||
|
||||
if (lsn1 < lsn2)
|
||||
return 1;
|
||||
else if (lsn1 == lsn2)
|
||||
return 0;
|
||||
else
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Look at .snap files and calculate minimum allowed restart_lsn of slot so that
|
||||
* next gc would leave not more than logical_replication_max_snap_files; all
|
||||
* slots having lower restart_lsn should be dropped.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
get_num_snap_files_lsn_threshold(void)
|
||||
{
|
||||
DIR *dirdesc;
|
||||
struct dirent *de;
|
||||
char *snap_path = "pg_logical/snapshots/";
|
||||
int cnt = 0;
|
||||
int lsns_allocated = 1024;
|
||||
int lsns_num = 0;
|
||||
XLogRecPtr *lsns;
|
||||
XLogRecPtr cutoff;
|
||||
|
||||
if (logical_replication_max_snap_files < 0)
|
||||
return 0;
|
||||
|
||||
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
|
||||
|
||||
/* find all .snap files and get their lsns */
|
||||
dirdesc = AllocateDir(snap_path);
|
||||
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
uint32 hi;
|
||||
uint32 lo;
|
||||
|
||||
if (strcmp(de->d_name, ".") == 0 ||
|
||||
strcmp(de->d_name, "..") == 0)
|
||||
continue;
|
||||
|
||||
if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("could not parse file name as .snap file \"%s\"", de->d_name)));
|
||||
continue;
|
||||
}
|
||||
|
||||
lsn = ((uint64) hi) << 32 | lo;
|
||||
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
|
||||
if (lsns_allocated == lsns_num)
|
||||
{
|
||||
lsns_allocated *= 2;
|
||||
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
|
||||
}
|
||||
lsns[lsns_num++] = lsn;
|
||||
}
|
||||
/* sort by lsn desc */
|
||||
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
|
||||
/* and take cutoff at logical_replication_max_snap_files */
|
||||
if (logical_replication_max_snap_files > lsns_num)
|
||||
cutoff = 0;
|
||||
/* have less files than cutoff */
|
||||
else
|
||||
{
|
||||
cutoff = lsns[logical_replication_max_snap_files - 1];
|
||||
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
|
||||
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
|
||||
}
|
||||
pfree(lsns);
|
||||
FreeDir(dirdesc);
|
||||
return cutoff;
|
||||
}
|
||||
|
||||
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
|
||||
|
||||
/*
|
||||
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
|
||||
* WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which
|
||||
* need too many .snap files.
|
||||
*/
|
||||
PGDLLEXPORT void
|
||||
LogicalSlotsMonitorMain(Datum main_arg)
|
||||
{
|
||||
SlotStatus* slots;
|
||||
TimestampTz now, last_checked;
|
||||
TimestampTz now,
|
||||
last_checked;
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
@@ -92,72 +169,101 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
slots = (SlotStatus*)calloc(max_replication_slots, sizeof(SlotStatus));
|
||||
last_checked = GetCurrentTimestamp();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
|
||||
logical_replication_max_time_lag*1000/2,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
XLogRecPtr cutoff_lsn;
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
|
||||
if (now - last_checked > logical_replication_max_time_lag*USECS_PER_SEC)
|
||||
/*
|
||||
* If there are too many .snap files, just drop all logical slots to
|
||||
* prevent aux files bloat.
|
||||
*/
|
||||
cutoff_lsn = get_num_snap_files_lsn_threshold();
|
||||
if (cutoff_lsn > 0)
|
||||
{
|
||||
int n_active_slots = 0;
|
||||
last_checked = now;
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
char slot_name[NAMEDATALEN];
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
XLogRecPtr restart_lsn;
|
||||
|
||||
/* find the name */
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
/* Consider only logical repliction slots */
|
||||
if (!s->in_use || !SlotIsLogical(s))
|
||||
continue;
|
||||
|
||||
if (s->active_pid != 0)
|
||||
{
|
||||
n_active_slots += 1;
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Check if there was some activity with the slot since last check */
|
||||
if (s->data.confirmed_flush != slots[i].confirmed_flush_lsn)
|
||||
/* do we need to drop it? */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
restart_lsn = s->data.restart_lsn;
|
||||
SpinLockRelease(&s->mutex);
|
||||
if (restart_lsn >= cutoff_lsn)
|
||||
{
|
||||
slots[i].confirmed_flush_lsn = s->data.confirmed_flush;
|
||||
slots[i].last_updated = now;
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
continue;
|
||||
}
|
||||
else if (now - slots[i].last_updated > logical_replication_max_time_lag*USECS_PER_SEC)
|
||||
{
|
||||
slots[i].name = s->data.name;
|
||||
slots[i].dropped = true;
|
||||
}
|
||||
}
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/*
|
||||
* If there are no active subscriptions, then no new snapshots are generated
|
||||
* and so no need to force slot deletion.
|
||||
*/
|
||||
if (n_active_slots != 0)
|
||||
{
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
strlcpy(slot_name, s->data.name.data, NAMEDATALEN);
|
||||
elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X",
|
||||
slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn));
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/* now try to drop it, killing owner before if any */
|
||||
for (;;)
|
||||
{
|
||||
if (slots[i].dropped)
|
||||
pid_t active_pid;
|
||||
|
||||
SpinLockAcquire(&s->mutex);
|
||||
active_pid = s->active_pid;
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
if (active_pid == 0)
|
||||
{
|
||||
elog(LOG, "Drop logical replication slot because it was not update more than %ld seconds",
|
||||
(now - slots[i].last_updated)/USECS_PER_SEC);
|
||||
ReplicationSlotDrop(slots[i].name.data, true);
|
||||
slots[i].dropped = false;
|
||||
/*
|
||||
* Slot is releasted, try to drop it. Though of course
|
||||
* it could have been reacquired, so drop can ERROR
|
||||
* out. Similarly it could have been dropped in the
|
||||
* meanwhile.
|
||||
*
|
||||
* In principle we could remove pg_try/pg_catch, that
|
||||
* would restart the whole bgworker.
|
||||
*/
|
||||
ConditionVariableCancelSleep();
|
||||
PG_TRY();
|
||||
{
|
||||
ReplicationSlotDrop(slot_name, true);
|
||||
elog(LOG, "ls_monitor: slot %s dropped", slot_name);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* log ERROR and reset elog stack */
|
||||
EmitErrorReport();
|
||||
FlushErrorState();
|
||||
elog(LOG, "ls_monitor: failed to drop slot %s", slot_name);
|
||||
}
|
||||
PG_END_TRY();
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* kill the owner and wait for release */
|
||||
elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid);
|
||||
(void) kill(active_pid, SIGTERM);
|
||||
/* We shouldn't get stuck, but to be safe add timeout. */
|
||||
ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
|
||||
LS_MONITOR_CHECK_INTERVAL,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import time
|
||||
from functools import partial
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
|
||||
@@ -10,7 +11,7 @@ from fixtures.neon_fixtures import (
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
def random_string(n: int):
|
||||
@@ -157,6 +158,51 @@ COMMIT;
|
||||
assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1
|
||||
|
||||
|
||||
# Test that neon.logical_replication_max_snap_files works
|
||||
def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
def slot_removed(ep):
|
||||
assert (
|
||||
endpoint.safe_psql(
|
||||
"select count(*) from pg_replication_slots where slot_name = 'stale_slot'"
|
||||
)[0][0]
|
||||
== 0
|
||||
)
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
# set low neon.logical_replication_max_snap_files
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication",
|
||||
config_lines=["log_statement=all", "neon.logical_replication_max_snap_files=1"],
|
||||
)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# create obsolete slot
|
||||
cur.execute("select pg_create_logical_replication_slot('stale_slot', 'pgoutput');")
|
||||
assert (
|
||||
endpoint.safe_psql(
|
||||
"select count(*) from pg_replication_slots where slot_name = 'stale_slot'"
|
||||
)[0][0]
|
||||
== 1
|
||||
)
|
||||
|
||||
# now insert some data and create and start live subscriber to create more .snap files
|
||||
# (in most cases this is not needed as stale_slot snap will have higher LSN than restart_lsn anyway)
|
||||
cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
cur.execute("create publication pub1 for table t")
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
|
||||
|
||||
|
||||
# Test compute start at LSN page of which starts with contrecord
|
||||
# https://github.com/neondatabase/neon/issues/5749
|
||||
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
|
||||
Reference in New Issue
Block a user