Make it possible to specify LFC chunk size in postgresql.conf

This commit is contained in:
Konstantin Knizhnik
2025-04-02 16:14:34 +03:00
parent 680ef72954
commit ac824227a6
4 changed files with 252 additions and 121 deletions

View File

@@ -87,15 +87,16 @@
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
/*
* Smaller chunk seems to be better for OLTP workload
*/
// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */
#define MAX_BLOCKS_PER_CHUNK_LOG 7 /* 1Mb chunk */
#define MAX_BLOCKS_PER_CHUNK (1 << MAX_BLOCKS_PER_CHUNK_LOG)
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log))
#define CHUNK_BITMAP_SIZE ((MAX_BLOCKS_PER_CHUNK + 31) / 32)
#define BLOCK_TO_CHUNK_NUN(blkno) (blkno >> lfc_chunk_size_log)
#define BLOCK_TO_CHUNK_OFF(blkno) (blkno & (lfc_blocks_per_chunk-1))
/*
* Blocks are read or written to LFC file outside LFC critical section.
@@ -130,6 +131,16 @@ typedef struct FileCacheEntry
#define N_COND_VARS 64
#define CV_WAIT_TIMEOUT 10
#define MAX_PREWARM_WORKERS 8
typedef struct PrewarmWorkerState
{
uint32 total_chunks;
uint32 curr_chunk;
uint32 prewarmed_pages;
uint32 skipped_pages;
} PrewarmWorkerState;
typedef struct FileCacheControl
{
uint64 generation; /* generation is needed to handle correct hash
@@ -137,6 +148,7 @@ typedef struct FileCacheControl
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
uint32 used_pages; /* number of used pages */
uint32 pinned; /* number of pinned chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
@@ -145,32 +157,37 @@ typedef struct FileCacheControl
uint64 time_write; /* time spent writing (us) */
uint64 resizes; /* number of LFC resizes */
uint64 evicted_pages; /* number of evicted pages */
uint32 prewarm_total_chunks;
uint32 prewarm_curr_chunk;
uint32 prewarmed_pages;
uint32 skipped_pages;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
} FileCacheControl;
bool lfc_store_prefetch_result;
#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc
typedef struct FileCacheStateEntry
typedef struct FileCacheState
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheStateEntry;
uint32 magic;
uint32 chunk_size_log;
size_t n_chunks;
BufferTag chunks[FLEXIBLE_ARRAY_MEMBER];
/* followed by bitmap */
} FileCacheState;
static HTAB *lfc_hash;
#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks])
#define FILE_CACHE_STATE_SIZE(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + ((n_chunks)+7)/8)
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
@@ -179,6 +196,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
/*
@@ -219,7 +238,9 @@ lfc_switch_off(void)
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->pinned = 0;
lfc_ctl->used = 0;
lfc_ctl->used_pages = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -383,6 +404,12 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source)
return true;
}
static void
lfc_change_chunk_size(int newval, void* extra)
{
lfc_blocks_per_chunk = 1 << newval;
}
static void
lfc_change_limit_hook(int newval, void *extra)
{
@@ -428,11 +455,11 @@ lfc_change_limit_hook(int newval, void *extra)
CriticalAssert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * lfc_blocks_per_chunk * BLCKSZ, lfc_blocks_per_chunk * BLCKSZ) < 0)
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
/* We remove the old entry, and re-enter a hole to the hash table */
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -525,7 +552,7 @@ lfc_init(void)
"Maximal number of prewarmed pages",
NULL,
&lfc_prewarm_limit,
0, /* disabled by default */
INT_MAX, /* no limit by default */
0,
INT_MAX,
PGC_SIGHUP,
@@ -547,6 +574,19 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.chunk_size_log",
"Logarithm of LFC chunk size in blocks",
NULL,
&lfc_chunk_size_log,
MAX_BLOCKS_PER_CHUNK_LOG,
0,
MAX_BLOCKS_PER_CHUNK_LOG,
PGC_POSTMASTER,
0,
NULL,
lfc_change_chunk_size,
NULL);
if (lfc_max_size == 0)
return;
@@ -560,42 +600,48 @@ lfc_init(void)
#endif
}
static FileCacheStateEntry*
lfc_get_state(size_t* n_entries)
static FileCacheState*
lfc_get_state(size_t max_entries)
{
size_t max_entries = *n_entries;
size_t i = 0;
FileCacheStateEntry* fs;
FileCacheState* fcs = NULL;
if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */
return NULL;
fs = (FileCacheStateEntry*)palloc0(sizeof(FileCacheStateEntry) * max_entries);
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
dlist_iter iter;
size_t i = 0;
uint8* bitmap;
size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned);
fcs = (FileCacheState*)palloc0(FILE_CACHE_STATE_SIZE(n_entries));
fcs->magic = FILE_CACHE_STATE_MAGIC;
fcs->chunk_size_log = lfc_chunk_size_log;
fcs->n_chunks = n_entries;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
for (int j = 0; j < BLOCKS_PER_CHUNK; j++)
fcs->chunks[i] = entry->key;
for (int j = 0; j < lfc_blocks_per_chunk; j++)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
fs[i].bitmap[j >> 5] |= (uint32)1 << (j & 31);
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
}
if (++i == max_entries)
if (++i == n_entries)
break;
}
elog(LOG, "LFC: save state of %ld chunks", (long)i);
Assert(i == n_entries);
elog(LOG, "LFC: save state of %ld chunks", (long)n_entries);
}
LWLockRelease(lfc_lock);
*n_entries = i;
return fs;
return fcs;
}
/*
@@ -603,46 +649,80 @@ lfc_get_state(size_t* n_entries)
* and avoid race conditions with other backends.
*/
static void
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
{
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
size_t fcs_chunk_size_log;
size_t n_entries;
size_t max_prefetch_pages;
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
bool save_lfc_store_prefetch_result;
size_t vacant_blocks;
PrewarmWorkerState* ws;
uint8* bitmap;
BufferTag tag;
if (!lfc_ensure_opened())
return;
if (n_entries == 0 || fs == NULL)
if (prewarm_batch == 0 || n_workers == 0)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
if (worker_id >= MAX_PREWARM_WORKERS || worker_id >= n_workers)
{
elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id);
}
ws = &lfc_ctl->prewarm_workers[worker_id];
if (fcs == NULL || fcs->n_chunks == 0)
{
elog(LOG, "LFC: nothing to prewarm");
return;
}
if (fcs->magic != FILE_CACHE_STATE_MAGIC)
{
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
}
fcs_chunk_size_log = fcs->chunk_size_log;
if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK)
{
elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log);
}
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
elog(LOG, "LFC: skip prewarm because LFC limit is smaller than prewarm size");
elog(LOG, "LFC: skip prewarm because LFC is already filled");
LWLockRelease(lfc_lock);
return;
}
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
vacant_blocks = (size_t)(lfc_ctl->limit - lfc_ctl->size) << lfc_chunk_size_log;
if (n_entries > (vacant_blocks >> fcs_chunk_size_log))
{
n_entries = lfc_ctl->limit - lfc_ctl->size;
n_entries = vacant_blocks >> fcs_chunk_size_log;
}
max_prefetch_pages = n_entries << fcs_chunk_size_log;
if (lfc_ctl->prewarm_total_chunks != lfc_ctl->prewarm_curr_chunk)
if (ws->total_chunks != ws->curr_chunk)
{
elog(LOG, "LFC: skip prewarm because concurrent prewarm is detected");
LWLockRelease(lfc_lock);
return;
elog(ERROR, "LFC: skip prewarm because prewarm worker %d is still active", worker_id);
}
/* Initialize fields used to track prewarming progress */
lfc_ctl->prewarm_total_chunks = n_entries;
lfc_ctl->prewarm_curr_chunk = 0;
ws->total_chunks = n_entries;
ws->curr_chunk = 0;
ws->prewarmed_pages = 0;
ws->skipped_pages = 0;
LWLockRelease(lfc_lock);
@@ -654,43 +734,54 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
while (true)
{
BufferTag tag;
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
BlockNumber offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
if (chunk_no < n_entries)
if (snd_idx < max_prefetch_pages)
{
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
tag = fs[chunk_no].key;
tag.blockNum += offs_in_chunk;
(void)prefetch_register_bufferv(tag, NULL, 1, NULL, true);
n_sent += 1;
}
snd_idx += 1;
}
if (n_sent >= n_received + prewarm_batch || chunk_no == n_entries)
{
do
{
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
rcv_idx += 1;
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
lfc_ctl->prewarm_curr_chunk = chunk_no;
tag = fs[chunk_no].key;
tag.blockNum += offs_in_chunk;
if (prefetch_receive(tag))
{
lfc_ctl->prewarmed_pages += 1;
/* If there are multiple workers, split chunks between them */
snd_idx += 1 << fcs_chunk_size_log;
}
else
{
lfc_ctl->skipped_pages += 1;
if (BITMAP_ISSET(bitmap, snd_idx))
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
(void)prefetch_register_bufferv(tag, NULL, 1, NULL, true);
n_sent += 1;
}
snd_idx += 1;
}
}
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages)
{
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* Skip chunks processed by other workers */
rcv_idx += 1 << fcs_chunk_size_log;
continue;
}
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
/* Locate next block to prefetch */
while (!BITMAP_ISSET(bitmap, rcv_idx))
{
rcv_idx += 1;
}
/* Update progress indicator */
ws->curr_chunk = rcv_idx >> fcs_chunk_size_log;
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
if (prefetch_receive(tag))
{
ws->prewarmed_pages += 1;
}
else
{
ws->skipped_pages += 1;
}
rcv_idx += 1;
if (++n_received == n_sent && snd_idx == max_prefetch_pages)
{
break;
}
@@ -698,8 +789,8 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
}
Assert(n_sent == n_received);
lfc_store_prefetch_result = save_lfc_store_prefetch_result;
lfc_ctl->prewarm_curr_chunk = n_entries;
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
ws->curr_chunk = n_entries;
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
}
@@ -712,7 +803,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry *entry;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
bool found = false;
uint32 hash;
@@ -721,7 +812,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
@@ -759,9 +850,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
@@ -772,12 +863,12 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
while (true)
{
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
int this_chunk = Min(nblocks - i, lfc_blocks_per_chunk - chunk_offs);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
for (; chunk_offs < lfc_blocks_per_chunk && i < nblocks; chunk_offs++, i++)
{
if (GET_STATE(entry, chunk_offs) != UNAVAILABLE)
{
@@ -801,9 +892,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* Prepare for the next iteration. We don't unlock here, as that'd
* probably be more expensive than the gains it'd get us.
*/
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno + i);
tag.blockNum = (blkno + i) - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
}
LWLockRelease(lfc_lock);
@@ -878,9 +969,9 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
uint8 chunk_mask[MAX_BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -968,8 +1059,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
generation = lfc_ctl->generation;
entry_offset = entry->offset;
@@ -1018,7 +1111,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
off_t first_read_offset = (off_t) entry_offset * lfc_blocks_per_chunk;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
@@ -1066,7 +1159,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
}
else
{
@@ -1143,7 +1239,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -1167,8 +1263,9 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
entry->access_count = 1;
entry->hash = hash;
lfc_ctl->pinned += 1;
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
SET_STATE(entry, i, UNAVAILABLE);
return true;
@@ -1213,7 +1310,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
FileCacheBlockState state;
XLogRecPtr lwlsn;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
@@ -1223,7 +1320,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1263,7 +1360,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1288,7 +1388,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwrite(lfc_desc, buffer, BLCKSZ,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1314,7 +1414,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
state = GET_STATE(entry, chunk_offs);
if (state == REQUESTED) {
@@ -1381,8 +1484,8 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
instr_time io_start, io_end;
ConditionVariable* cv;
@@ -1394,7 +1497,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
iov[i].iov_len = BLCKSZ;
}
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1414,7 +1517,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1467,7 +1573,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1494,7 +1600,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
for (int i = 0; i < blocks_in_chunk; i++)
{
@@ -1620,7 +1729,12 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
break;
case 8:
key = "file_cache_chunk_size_pages";
value = BLOCKS_PER_CHUNK;
value = lfc_blocks_per_chunk;
break;
case 9:
key = "file_cache_pinned";
if (lfc_ctl)
value = lfc_ctl->pinned;
break;
default:
SRF_RETURN_DONE(funcctx);
@@ -1748,7 +1862,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Skip hole tags */
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
}
@@ -1776,13 +1890,13 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].pageoffs = entry->offset * lfc_blocks_per_chunk + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
@@ -1872,16 +1986,16 @@ PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
if (fs != NULL)
size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheState* fcs = lfc_get_state(max_entries);
if (fcs != NULL)
{
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
size_t size_in_bytes = FILE_CACHE_STATE_SIZE(fcs->n_chunks);
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
memcpy(VARDATA(res), fs, size_in_bytes);
pfree(fs);
memcpy(VARDATA(res), fcs, size_in_bytes);
pfree(fcs);
PG_RETURN_BYTEA_P(res);
}
@@ -1894,10 +2008,14 @@ Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_entries = VARSIZE_ANY_EXHDR(state)/sizeof(FileCacheStateEntry);
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
uint32 worker_id = PG_GETARG_INT32(1);
uint32 n_workers = PG_GETARG_INT32(2);
FileCacheState* fcs = (FileCacheState*)VARDATA_ANY(state);
lfc_prewarm(fs, n_entries);
if (FILE_CACHE_STATE_SIZE(fcs->n_chunks) != VARSIZE_ANY_EXHDR(state))
elog(ERROR, "LFC: Invalid file cache state size");
lfc_prewarm(fcs, worker_id, n_workers);
PG_RETURN_NULL();
}
@@ -1910,10 +2028,17 @@ get_prewarm_info(PG_FUNCTION_ARGS)
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
uint32 worker_id = PG_GETARG_INT32(0);
PrewarmWorkerState* ws;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
if (worker_id >= MAX_PREWARM_WORKERS)
{
elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id);
}
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
@@ -1923,10 +2048,13 @@ get_prewarm_info(PG_FUNCTION_ARGS)
MemSet(nulls, 0, sizeof(nulls));
LWLockAcquire(lfc_lock, LW_SHARED);
values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks);
values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk);
values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages);
values[3] = Int32GetDatum(lfc_ctl->skipped_pages);
ws = &lfc_ctl->prewarm_workers[worker_id];
values[0] = Int32GetDatum(ws->total_chunks);
values[1] = Int32GetDatum(ws->curr_chunk);
values[2] = Int32GetDatum(ws->prewarmed_pages);
values[3] = Int32GetDatum(ws->skipped_pages);
LWLockRelease(lfc_lock);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));

View File

@@ -166,6 +166,9 @@ typedef struct
WaitEventSet *wes_read;
} PageServer;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);

View File

@@ -1,6 +1,6 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
CREATE FUNCTION get_prewarm_info(worker_id integer default 0, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
@@ -12,7 +12,7 @@ AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea)
CREATE FUNCTION prewarm_local_cache(state bytea, worker_id integer default 0, n_workers integer default 1)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT

View File

@@ -1,7 +1,7 @@
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
DROP FUNCTION IF EXISTS get_prewarm_info(worker_id integer, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, worker_id integer, n_workers integer);