mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 05:20:38 +00:00
Use stadnard Postgres HyperLogLog algorithm for estimation of working set size
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "access/parallel.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "lib/hyperloglog.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "storage/relfilenode.h"
|
||||
#include "storage/buf_internals.h"
|
||||
@@ -67,66 +68,7 @@
|
||||
|
||||
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
||||
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
|
||||
|
||||
/* Hyper log-log implementation for counting size of working set */
|
||||
|
||||
#define HASH_BITS 25
|
||||
#define N_HASHES (1 << (32 - HASH_BITS))
|
||||
|
||||
static void
|
||||
calculate_zero_bits(uint32 h, uint8* max_zero_bits)
|
||||
{
|
||||
int j = h >> HASH_BITS;
|
||||
int zero_bits = 1;
|
||||
while ((h & 1) == 0 && zero_bits <= HASH_BITS) {
|
||||
h >>= 1;
|
||||
zero_bits += 1;
|
||||
}
|
||||
if (max_zero_bits[j] < zero_bits) {
|
||||
max_zero_bits[j] = zero_bits;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
calculate_hash_functions(void const* val, int size, uint8* max_zero_bits)
|
||||
{
|
||||
uint32 h = hash_bytes((char const*)val, size);
|
||||
calculate_zero_bits(h, max_zero_bits);
|
||||
}
|
||||
|
||||
static uint32
|
||||
approximate_distinct_count(uint8* max_zero_bits)
|
||||
{
|
||||
const int m = N_HASHES;
|
||||
const double alpha_m = 0.7213 / (1 + 1.079 / (double)m);
|
||||
const double pow_2_32 = 4294967296.0;
|
||||
double E, c = 0;
|
||||
int i;
|
||||
for (i = 0; i < m; i++)
|
||||
{
|
||||
c += 1 / pow(2., (double)max_zero_bits[i]);
|
||||
}
|
||||
E = alpha_m * m * m / c;
|
||||
|
||||
if (E <= (5 / 2. * m))
|
||||
{
|
||||
double V = 0;
|
||||
for (i = 0; i < m; i++)
|
||||
{
|
||||
if (max_zero_bits[i] == 0) V++;
|
||||
}
|
||||
|
||||
if (V > 0)
|
||||
{
|
||||
E = m * log(m / V);
|
||||
}
|
||||
}
|
||||
else if (E > (1 / 30. * pow_2_32))
|
||||
{
|
||||
E = -pow_2_32 * log(1 - E / pow_2_32);
|
||||
}
|
||||
return (uint32)E;
|
||||
}
|
||||
#define HYPER_LOG_LOG_BIT_WIDTH 10
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
@@ -142,7 +84,8 @@ typedef struct FileCacheControl
|
||||
uint32 size; /* size of cache file in chunks */
|
||||
uint32 used; /* number of used chunks */
|
||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||
uint8 max_zero_bits[N_HASHES];
|
||||
hyperLogLogState wss_estimation; /* estimation of wroking set size */
|
||||
char hyperloglog_hashes[(1 << HYPER_LOG_LOG_BIT_WIDTH) + 1];
|
||||
} FileCacheControl;
|
||||
|
||||
static HTAB* lfc_hash;
|
||||
@@ -187,7 +130,12 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
memset(lfc_ctl->max_zero_bits, 0, sizeof lfc_ctl->max_zero_bits);
|
||||
initHyperLogLog(&lfc_ctl->wss_estimation, HYPER_LOG_LOG_BIT_WIDTH);
|
||||
|
||||
/* We need hashes in shared memory */
|
||||
pfree(lfc_ctl->wss_estimation.hashesArr);
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
lfc_ctl->wss_estimation.hashesArr = lfc_ctl->hyperloglog_hashes;
|
||||
|
||||
/* Remove file cache on restart */
|
||||
(void)unlink(lfc_path);
|
||||
@@ -459,12 +407,11 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
calculate_hash_functions(&tag, sizeof(tag), lfc_ctl->max_zero_bits);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
|
||||
/* Approximate working set */
|
||||
tag.blockNum = blkno;
|
||||
calculate_hash_functions(&tag, sizeof(tag), lfc_ctl->max_zero_bits);
|
||||
addHyperLogLog(&lfc_ctl->wss_estimation, hash_bytes((char const*)&tag, sizeof(tag)));
|
||||
|
||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||
{
|
||||
@@ -779,9 +726,9 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bool reset = PG_GETARG_BOOL(0);
|
||||
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
|
||||
dc = (int32)approximate_distinct_count(lfc_ctl->max_zero_bits);
|
||||
dc = (int32) estimateHyperLogLog(&lfc_ctl->wss_estimation);
|
||||
if (reset)
|
||||
memset(lfc_ctl->max_zero_bits, 0, sizeof lfc_ctl->max_zero_bits);
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
PG_RETURN_INT32(dc);
|
||||
|
||||
Reference in New Issue
Block a user