mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 14:10:37 +00:00
Add neon.logical_replication_max_logicalsnapdir_size
This GUC will drop replication slots if the size of the pg_logical/snapshots directory (not including temp snapshot files) becomes larger than the specified size. Keeping the size of this directory smaller will help with basebackup size from the pageserver. Part-of: https://github.com/neondatabase/neon/issues/8619 Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
#include <dirent.h>
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include <dirent.h>
|
||||
#include <signal.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
@@ -21,17 +22,35 @@
|
||||
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
|
||||
/*
|
||||
* According to Chi (shyzh), the pageserver _should_ be good with 10 MB worth of
|
||||
* snapshot files. Let's use 8 MB since 8 is a power of 2.
|
||||
*/
|
||||
static int logical_replication_max_logicalsnapdir_size = 8000;
|
||||
|
||||
/*
|
||||
* A primitive description of a logical snapshot file including the LSN of the
|
||||
* file and its size.
|
||||
*/
|
||||
typedef struct SnapDesc {
|
||||
XLogRecPtr lsn;
|
||||
off_t sz;
|
||||
} SnapDesc;
|
||||
|
||||
PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
|
||||
|
||||
/*
|
||||
* Sorts an array of snapshot descriptors by their LSN.
|
||||
*/
|
||||
static int
|
||||
LsnDescComparator(const void *a, const void *b)
|
||||
SnapDescComparator(const void *a, const void *b)
|
||||
{
|
||||
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
||||
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
||||
const SnapDesc *desc1 = a;
|
||||
const SnapDesc *desc2 = b;
|
||||
|
||||
if (lsn1 < lsn2)
|
||||
if (desc1->lsn < desc2->lsn)
|
||||
return 1;
|
||||
else if (lsn1 == lsn2)
|
||||
else if (desc1->lsn == desc2->lsn)
|
||||
return 0;
|
||||
else
|
||||
return -1;
|
||||
@@ -43,28 +62,39 @@ LsnDescComparator(const void *a, const void *b)
|
||||
* slots having lower restart_lsn should be dropped.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
get_num_snap_files_lsn_threshold(void)
|
||||
get_snapshots_cutoff_lsn(void)
|
||||
{
|
||||
DIR *dirdesc;
|
||||
struct dirent *de;
|
||||
char *snap_path = "pg_logical/snapshots/";
|
||||
int lsns_allocated = 1024;
|
||||
int lsns_num = 0;
|
||||
XLogRecPtr *lsns;
|
||||
XLogRecPtr cutoff;
|
||||
/* PG 18 has a constant defined for this, PG_LOGICAL_SNAPSHOTS_DIR */
|
||||
#define SNAPDIR "pg_logical/snapshots"
|
||||
|
||||
if (logical_replication_max_snap_files < 0)
|
||||
DIR *dirdesc;
|
||||
int dirdesc_fd;
|
||||
struct dirent *de;
|
||||
size_t snapshot_index = 0;
|
||||
SnapDesc *snapshot_descriptors;
|
||||
size_t descriptors_allocated = 1024;
|
||||
XLogRecPtr cutoff = 0;
|
||||
off_t logicalsnapdir_size = 0;
|
||||
const int logical_replication_max_logicalsnapdir_size_bytes = logical_replication_max_logicalsnapdir_size * 1000;
|
||||
|
||||
if (logical_replication_max_snap_files < 0 && logical_replication_max_logicalsnapdir_size < 0)
|
||||
return 0;
|
||||
|
||||
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
|
||||
snapshot_descriptors = palloc(sizeof(*snapshot_descriptors) * descriptors_allocated);
|
||||
|
||||
dirdesc = AllocateDir(SNAPDIR);
|
||||
dirdesc_fd = dirfd(dirdesc);
|
||||
if (dirdesc_fd == -1)
|
||||
ereport(ERROR, errmsg("failed to get a file descriptor for " SNAPDIR ": %m"));
|
||||
|
||||
/* find all .snap files and get their lsns */
|
||||
dirdesc = AllocateDir(snap_path);
|
||||
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
|
||||
while ((de = ReadDir(dirdesc, SNAPDIR)) != NULL)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
uint32 hi;
|
||||
uint32 lo;
|
||||
struct stat st;
|
||||
XLogRecPtr lsn;
|
||||
SnapDesc *desc;
|
||||
|
||||
if (strcmp(de->d_name, ".") == 0 ||
|
||||
strcmp(de->d_name, "..") == 0)
|
||||
@@ -79,28 +109,69 @@ get_num_snap_files_lsn_threshold(void)
|
||||
|
||||
lsn = ((uint64) hi) << 32 | lo;
|
||||
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
|
||||
if (lsns_allocated == lsns_num)
|
||||
|
||||
if (fstatat(dirdesc_fd, de->d_name, &st, 0) == -1)
|
||||
ereport(ERROR, errmsg("failed to get the size of " SNAPDIR "/%s: %m", de->d_name));
|
||||
|
||||
if (descriptors_allocated == snapshot_index)
|
||||
{
|
||||
lsns_allocated *= 2;
|
||||
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
|
||||
descriptors_allocated *= 2;
|
||||
snapshot_descriptors = repalloc(snapshot_descriptors, sizeof(*snapshot_descriptors) * descriptors_allocated);
|
||||
}
|
||||
lsns[lsns_num++] = lsn;
|
||||
|
||||
desc = &snapshot_descriptors[snapshot_index++];
|
||||
desc->lsn = lsn;
|
||||
desc->sz = st.st_size;
|
||||
}
|
||||
/* sort by lsn desc */
|
||||
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
|
||||
/* and take cutoff at logical_replication_max_snap_files */
|
||||
if (logical_replication_max_snap_files > lsns_num)
|
||||
cutoff = 0;
|
||||
/* have less files than cutoff */
|
||||
else
|
||||
|
||||
qsort(snapshot_descriptors, snapshot_index, sizeof(*snapshot_descriptors), SnapDescComparator);
|
||||
|
||||
/* Are there more snapshot files than specified? */
|
||||
if (logical_replication_max_snap_files <= snapshot_index)
|
||||
{
|
||||
cutoff = lsns[logical_replication_max_snap_files - 1];
|
||||
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
|
||||
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
|
||||
cutoff = snapshot_descriptors[logical_replication_max_snap_files - 1].lsn;
|
||||
elog(LOG,
|
||||
"ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %zu snapshot files, limit is %d",
|
||||
LSN_FORMAT_ARGS(cutoff), snapshot_index, logical_replication_max_snap_files);
|
||||
}
|
||||
pfree(lsns);
|
||||
|
||||
/* Is the size of the logical snapshots directory larger than specified?
|
||||
*
|
||||
* It's possible we could hit both thresholds, so remove any extra files
|
||||
* first, and then truncate based on size of the remaining files.
|
||||
*/
|
||||
if (logicalsnapdir_size > logical_replication_max_logicalsnapdir_size_bytes)
|
||||
{
|
||||
/* Unfortunately, iterating the directory does not guarantee any order
|
||||
* so we can't cache an index in the preceding loop.
|
||||
*/
|
||||
|
||||
off_t sz;
|
||||
const XLogRecPtr original = cutoff;
|
||||
|
||||
sz = snapshot_descriptors[0].sz;
|
||||
for (size_t i = 1; i < logical_replication_max_snap_files; ++i)
|
||||
{
|
||||
if (sz > logical_replication_max_logicalsnapdir_size_bytes)
|
||||
{
|
||||
cutoff = snapshot_descriptors[i - 1].lsn;
|
||||
break;
|
||||
}
|
||||
|
||||
sz += snapshot_descriptors[i].sz;
|
||||
}
|
||||
|
||||
if (cutoff != original)
|
||||
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower than %X/%X, " SNAPDIR " is larger than %d KB",
|
||||
LSN_FORMAT_ARGS(cutoff), logical_replication_max_logicalsnapdir_size);
|
||||
}
|
||||
|
||||
pfree(snapshot_descriptors);
|
||||
FreeDir(dirdesc);
|
||||
|
||||
return cutoff;
|
||||
|
||||
#undef SNAPDIR
|
||||
}
|
||||
|
||||
void
|
||||
@@ -118,6 +189,16 @@ InitLogicalReplicationMonitor(void)
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.logical_replication_max_logicalsnapdir_size",
|
||||
"Maximum allowed size of the pg_logical/snapshots directory (KB). When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
|
||||
NULL,
|
||||
&logical_replication_max_logicalsnapdir_size,
|
||||
8000, -1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_KB,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
@@ -162,7 +243,7 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
* If there are too many .snap files, just drop all logical slots to
|
||||
* prevent aux files bloat.
|
||||
*/
|
||||
cutoff_lsn = get_num_snap_files_lsn_threshold();
|
||||
cutoff_lsn = get_snapshots_cutoff_lsn();
|
||||
if (cutoff_lsn > 0)
|
||||
{
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
|
||||
Reference in New Issue
Block a user