diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 8dff259f02..8670650ff4 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -17,6 +17,7 @@ #include #include #include +#include #include "postgres.h" #include "funcapi.h" @@ -66,6 +67,126 @@ #define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */ #define MAX_DISK_WRITE_RATE 1000 /* MB/sec */ +/* Hyper log-log implementation for counting size of working set */ + +#define ROTL32(x, r) ((x) << (r)) | ((x) >> (32 - (r))) +#define HASH_BITS 25 +#define N_HASHES (1 << (32 - HASH_BITS)) +#define MURMUR_SEED 0x5C1DB + +static uint32 +murmur_hash3_32(const void* key, const int len, const uint32 seed) +{ + const uint8 * data = (const uint8*)key; + const int nblocks = len / 4; + + uint32 h1 = seed; + + uint32 c1 = 0xcc9e2d51; + uint32 c2 = 0x1b873593; + int i; + uint32 k1; + const uint8* tail; + const uint32* blocks = (const uint32 *)(data + nblocks*4); + + for(i = -nblocks; i; i++) + { + k1 = blocks[i]; + + k1 *= c1; + k1 = ROTL32(k1,15); + k1 *= c2; + + h1 ^= k1; + h1 = ROTL32(h1,13); + h1 = h1*5+0xe6546b64; + } + + tail = (const uint8*)(data + nblocks*4); + + k1 = 0; + + switch(len & 3) + { + case 3: + k1 ^= tail[2] << 16; + /* no break */ + case 2: + k1 ^= tail[1] << 8; + /* no break */ + case 1: + k1 ^= tail[0]; + k1 *= c1; + k1 = ROTL32(k1,15); + k1 *= c2; + h1 ^= k1; + } + + h1 ^= len; + h1 ^= h1 >> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >> 16; + + return h1; +} + +static void +calculate_zero_bits(uint32 h, uint8* max_zero_bits) +{ + int j = h >> HASH_BITS; + int zero_bits = 1; + while ((h & 1) == 0 && zero_bits <= HASH_BITS) { + h >>= 1; + zero_bits += 1; + } + if (max_zero_bits[j] < zero_bits) { + max_zero_bits[j] = zero_bits; + } +} + +static void +calculate_hash_functions(void const* val, int size, uint8* max_zero_bits) +{ + uint32 h = murmur_hash3_32(val, size, MURMUR_SEED); + calculate_zero_bits(h, max_zero_bits); +} + +static uint32 +approximate_distinct_count(uint8* max_zero_bits) +{ + const int m = N_HASHES; + const double alpha_m = 0.7213 / (1 + 1.079 / (double)m); + const double pow_2_32 = 0xffffffff; + double E, c = 0; + int i; + for (i = 0; i < m; i++) + { + c += 1 / pow(2., (double)max_zero_bits[i]); + } + E = alpha_m * m * m / c; + + if (E <= (5 / 2. * m)) + { + double V = 0; + for (i = 0; i < m; i++) + { + if (max_zero_bits[i] == 0) V++; + } + + if (V > 0) + { + E = m * log(m / V); + } + } + else if (E > (1 / 30. * pow_2_32)) + { + E = -pow_2_32 * log(1 - E / pow_2_32); + } + return (uint32)E; +} + typedef struct FileCacheEntry { BufferTag key; @@ -77,9 +198,10 @@ typedef struct FileCacheEntry typedef struct FileCacheControl { - uint32 size; /* size of cache file in chunks */ - uint32 used; /* number of used chunks */ - dlist_head lru; /* double linked list for LRU replacement algorithm */ + uint32 size; /* size of cache file in chunks */ + uint32 used; /* number of used chunks */ + dlist_head lru; /* double linked list for LRU replacement algorithm */ + uint8 max_zero_bits[N_HASHES]; } FileCacheControl; static HTAB* lfc_hash; @@ -124,6 +246,7 @@ lfc_shmem_startup(void) lfc_ctl->size = 0; lfc_ctl->used = 0; dlist_init(&lfc_ctl->lru); + memset(lfc_ctl->max_zero_bits, 0, sizeof lfc_ctl->max_zero_bits); /* Remove file cache on restart */ (void)unlink(lfc_path); @@ -395,7 +518,13 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + calculate_hash_functions(&tag, sizeof(tag), lfc_ctl->max_zero_bits); entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + + /* Approximate working set */ + tag.blockNum = blkno; + calculate_hash_functions(&tag, sizeof(tag), lfc_ctl->max_zero_bits); + if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0) { /* Page is not cached */ @@ -698,3 +827,21 @@ local_cache_pages(PG_FUNCTION_ARGS) else SRF_RETURN_DONE(funcctx); } + +PG_FUNCTION_INFO_V1(approximate_working_set_size); + +Datum +approximate_working_set_size(PG_FUNCTION_ARGS) +{ + int32 dc = -1; + if (lfc_size_limit != 0) + { + bool reset = PG_GETARG_BOOL(0); + LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED); + dc = (int32)approximate_distinct_count(lfc_ctl->max_zero_bits); + if (reset) + memset(lfc_ctl->max_zero_bits, 0, sizeof lfc_ctl->max_zero_bits); + LWLockRelease(lfc_lock); + } + PG_RETURN_INT32(dc); +} diff --git a/pgxn/neon/neon--1.0.sql b/pgxn/neon/neon--1.0.sql index 6cf111ea6a..78982b46cd 100644 --- a/pgxn/neon/neon--1.0.sql +++ b/pgxn/neon/neon--1.0.sql @@ -27,6 +27,11 @@ RETURNS SETOF RECORD AS 'MODULE_PATHNAME', 'local_cache_pages' LANGUAGE C PARALLEL SAFE; +CREATE FUNCTION approximate_working_set_size(reset bool) +RETURNS integer +AS 'MODULE_PATHNAME', 'approximate_working_set_size' +LANGUAGE C PARALLEL SAFE; + -- Create a view for convenient access. CREATE VIEW local_cache AS SELECT P.* FROM local_cache_pages() AS P