diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index bc27942bb1..d361ff7274 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -22,6 +22,7 @@ #endif #include "access/xlog_internal.h" #include "access/xlogutils.h" +#include "common/hashfn.h" #include "executor/instrument.h" #include "miscadmin.h" #include "postmaster/bgworker.h" @@ -40,6 +41,7 @@ #include "tcop/tcopprot.h" #include "communicator_new.h" +#include "hll.h" #include "neon.h" #include "neon_perf_counters.h" #include "pagestore_client.h" @@ -98,7 +100,19 @@ typedef struct CommunicatorShmemPerBackendData typedef struct CommunicatorShmemData { - int dummy; + /* + * Estimation of working set size. + * + * Note that this is not protected by any locks. That's sloppy, but works + * fine in practice. To "add" a value to the HLL state, we just overwrite + * one of the timestamps. Calculating the estimate reads all the values, but + * it also doesn't depend on seeing a consistent snapshot of the values. We + * could get bogus results if accessing the TimestampTz was not atomic, but + * it on any 64-bit platforms we care about it is, and even if we observed a + * torn read every now and then, it wouldn't affect the overall estimate + * much. + */ + HyperLogLogState wss_estimation; CommunicatorShmemPerBackendData backends[]; /* MaxProcs */ @@ -250,6 +264,9 @@ communicator_new_shmem_startup(void) shmem_ptr = (char *) shmem_ptr + communicator_size; shmem_size -= communicator_size; + /* Initialize hyper-log-log structure for estimating working set size */ + initSHLL(&communicator_shmem_ptr->wss_estimation); + for (int i = 0; i < MaxProcs; i++) { InitSharedLatch(&communicator_shmem_ptr->backends[i].io_completion_latch); @@ -743,6 +760,19 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe } }; + { + BufferTag tag; + + CopyNRelFileInfoToBufTag(tag, rinfo); + tag.forkNum = forkNum; + for (int i = 0; i < nblocks; i++) + { + tag.blockNum = blockno; + addSHLL(&communicator_shmem_ptr->wss_estimation, + hash_bytes((uint8_t *) &tag, sizeof(tag))); + } + } + elog(DEBUG5, "getpagev called for rel %u/%u/%u.%u block %u (%u blocks)", RelFileInfoFmt(rinfo), forkNum, blockno, nblocks); @@ -1357,3 +1387,14 @@ bounce_write_if_needed(void *buffer) memcpy(p, buffer, BLCKSZ); return p; } + +int32 +communicator_new_approximate_working_set_size_seconds(time_t duration, bool reset) +{ + int32 dc; + + dc = (int32) estimateSHLL(&communicator_shmem_ptr->wss_estimation, duration); + if (reset) + memset(communicator_shmem_ptr->wss_estimation.regs, 0, sizeof(communicator_shmem_ptr->wss_estimation.regs)); + return dc; +} diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 5b636b687a..dc38b3059e 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -54,4 +54,7 @@ extern void communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum extern void communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn); extern void communicator_new_forget_cache(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn); +/* other functions */ +extern int32 communicator_new_approximate_working_set_size_seconds(time_t duration, bool reset); + #endif /* COMMUNICATOR_NEW_H */ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e5e2bb9183..847e2ba9f6 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -2159,46 +2159,21 @@ local_cache_pages(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } -PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds); -Datum -approximate_working_set_size_seconds(PG_FUNCTION_ARGS) +int32 +lfc_approximate_working_set_size_seconds(time_t duration, bool reset) { - if (neon_enable_new_communicator) - elog(ERROR, "TODO: not implemented"); + int32 dc; - if (lfc_size_limit != 0) - { - int32 dc; - time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0); - LWLockAcquire(lfc_lock, LW_SHARED); - dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); - LWLockRelease(lfc_lock); - PG_RETURN_INT32(dc); - } - PG_RETURN_NULL(); -} + if (lfc_size_limit == 0) + return -1; -PG_FUNCTION_INFO_V1(approximate_working_set_size); - -Datum -approximate_working_set_size(PG_FUNCTION_ARGS) -{ - if (neon_enable_new_communicator) - elog(ERROR, "TODO: not implemented"); - - if (lfc_size_limit != 0) - { - int32 dc; - bool reset = PG_GETARG_BOOL(0); - LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED); - dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1); - if (reset) - memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); - LWLockRelease(lfc_lock); - PG_RETURN_INT32(dc); - } - PG_RETURN_NULL(); + LWLockAcquire(lfc_lock, LW_SHARED); + dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); + if (reset) + memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); + LWLockRelease(lfc_lock); + return dc; } PG_FUNCTION_INFO_V1(get_local_cache_state); diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index 1b6ff36164..a5ffa6ea92 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -52,6 +52,9 @@ extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers); PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); +extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset); + + static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, void *buffer) diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index ab51abc1de..548bdd9bb8 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -583,6 +583,8 @@ _PG_init(void) PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); PG_FUNCTION_INFO_V1(backpressure_throttling_time); +PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds); +PG_FUNCTION_INFO_V1(approximate_working_set_size); Datum pg_cluster_size(PG_FUNCTION_ARGS) @@ -629,6 +631,40 @@ backpressure_throttling_time(PG_FUNCTION_ARGS) PG_RETURN_UINT64(BackpressureThrottlingTime()); } +Datum +approximate_working_set_size_seconds(PG_FUNCTION_ARGS) +{ + time_t duration; + int32 dc; + + duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0); + + if (neon_enable_new_communicator) + dc = communicator_new_approximate_working_set_size_seconds(duration, false); + else + dc = lfc_approximate_working_set_size_seconds(duration, false); + if (dc < 0) + PG_RETURN_NULL(); + else + PG_RETURN_INT32(dc); +} + +Datum +approximate_working_set_size(PG_FUNCTION_ARGS) +{ + int32 dc; + bool reset = PG_GETARG_BOOL(0); + + if (neon_enable_new_communicator) + dc = communicator_new_approximate_working_set_size_seconds(-1, reset); + else + dc = lfc_approximate_working_set_size_seconds(-1, reset); + if (dc < 0) + PG_RETURN_NULL(); + else + PG_RETURN_INT32(dc); +} + static void neon_shmem_request(void) {