Implement working set size estimation

This commit is contained in:
Heikki Linnakangas
2025-07-08 23:41:05 +03:00
parent 732bd26e70
commit 1ee24602d5
5 changed files with 95 additions and 37 deletions

View File

@@ -22,6 +22,7 @@
#endif
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "common/hashfn.h"
#include "executor/instrument.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
@@ -40,6 +41,7 @@
#include "tcop/tcopprot.h"
#include "communicator_new.h"
#include "hll.h"
#include "neon.h"
#include "neon_perf_counters.h"
#include "pagestore_client.h"
@@ -98,7 +100,19 @@ typedef struct CommunicatorShmemPerBackendData
typedef struct CommunicatorShmemData
{
int dummy;
/*
* Estimation of working set size.
*
* Note that this is not protected by any locks. That's sloppy, but works
* fine in practice. To "add" a value to the HLL state, we just overwrite
* one of the timestamps. Calculating the estimate reads all the values, but
* it also doesn't depend on seeing a consistent snapshot of the values. We
* could get bogus results if accessing the TimestampTz was not atomic, but
* it on any 64-bit platforms we care about it is, and even if we observed a
* torn read every now and then, it wouldn't affect the overall estimate
* much.
*/
HyperLogLogState wss_estimation;
CommunicatorShmemPerBackendData backends[]; /* MaxProcs */
@@ -250,6 +264,9 @@ communicator_new_shmem_startup(void)
shmem_ptr = (char *) shmem_ptr + communicator_size;
shmem_size -= communicator_size;
/* Initialize hyper-log-log structure for estimating working set size */
initSHLL(&communicator_shmem_ptr->wss_estimation);
for (int i = 0; i < MaxProcs; i++)
{
InitSharedLatch(&communicator_shmem_ptr->backends[i].io_completion_latch);
@@ -743,6 +760,19 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
}
};
{
BufferTag tag;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
for (int i = 0; i < nblocks; i++)
{
tag.blockNum = blockno;
addSHLL(&communicator_shmem_ptr->wss_estimation,
hash_bytes((uint8_t *) &tag, sizeof(tag)));
}
}
elog(DEBUG5, "getpagev called for rel %u/%u/%u.%u block %u (%u blocks)",
RelFileInfoFmt(rinfo), forkNum, blockno, nblocks);
@@ -1357,3 +1387,14 @@ bounce_write_if_needed(void *buffer)
memcpy(p, buffer, BLCKSZ);
return p;
}
int32
communicator_new_approximate_working_set_size_seconds(time_t duration, bool reset)
{
int32 dc;
dc = (int32) estimateSHLL(&communicator_shmem_ptr->wss_estimation, duration);
if (reset)
memset(communicator_shmem_ptr->wss_estimation.regs, 0, sizeof(communicator_shmem_ptr->wss_estimation.regs));
return dc;
}

View File

@@ -54,4 +54,7 @@ extern void communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum
extern void communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn);
extern void communicator_new_forget_cache(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn);
/* other functions */
extern int32 communicator_new_approximate_working_set_size_seconds(time_t duration, bool reset);
#endif /* COMMUNICATOR_NEW_H */

View File

@@ -2159,46 +2159,21 @@ local_cache_pages(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds);
Datum
approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
int32
lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
{
if (neon_enable_new_communicator)
elog(ERROR, "TODO: not implemented");
int32 dc;
if (lfc_size_limit != 0)
{
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
}
if (lfc_size_limit == 0)
return -1;
PG_FUNCTION_INFO_V1(approximate_working_set_size);
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
if (neon_enable_new_communicator)
elog(ERROR, "TODO: not implemented");
if (lfc_size_limit != 0)
{
int32 dc;
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
return dc;
}
PG_FUNCTION_INFO_V1(get_local_cache_state);

View File

@@ -52,6 +52,9 @@ extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)

View File

@@ -583,6 +583,8 @@ _PG_init(void)
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds);
PG_FUNCTION_INFO_V1(approximate_working_set_size);
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -629,6 +631,40 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
Datum
approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
{
time_t duration;
int32 dc;
duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0);
if (neon_enable_new_communicator)
dc = communicator_new_approximate_working_set_size_seconds(duration, false);
else
dc = lfc_approximate_working_set_size_seconds(duration, false);
if (dc < 0)
PG_RETURN_NULL();
else
PG_RETURN_INT32(dc);
}
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
int32 dc;
bool reset = PG_GETARG_BOOL(0);
if (neon_enable_new_communicator)
dc = communicator_new_approximate_working_set_size_seconds(-1, reset);
else
dc = lfc_approximate_working_set_size_seconds(-1, reset);
if (dc < 0)
PG_RETURN_NULL();
else
PG_RETURN_INT32(dc);
}
static void
neon_shmem_request(void)
{