diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 4ad29a1eb9..9d04577b1c 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -87,15 +87,16 @@ * 1Mb chunks can reduce hash map size to 320Mb. * 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed */ -#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */ -/* - * Smaller chunk seems to be better for OLTP workload - */ -// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */ +#define MAX_BLOCKS_PER_CHUNK_LOG 7 /* 1Mb chunk */ +#define MAX_BLOCKS_PER_CHUNK (1 << MAX_BLOCKS_PER_CHUNK_LOG) + #define MB ((uint64)1024*1024) -#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) -#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32) +#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log)) +#define CHUNK_BITMAP_SIZE ((MAX_BLOCKS_PER_CHUNK + 31) / 32) + +#define BLOCK_TO_CHUNK_NUN(blkno) (blkno >> lfc_chunk_size_log) +#define BLOCK_TO_CHUNK_OFF(blkno) (blkno & (lfc_blocks_per_chunk-1)) /* * Blocks are read or written to LFC file outside LFC critical section. @@ -130,6 +131,16 @@ typedef struct FileCacheEntry #define N_COND_VARS 64 #define CV_WAIT_TIMEOUT 10 +#define MAX_PREWARM_WORKERS 8 + +typedef struct PrewarmWorkerState +{ + uint32 total_chunks; + uint32 curr_chunk; + uint32 prewarmed_pages; + uint32 skipped_pages; +} PrewarmWorkerState; + typedef struct FileCacheControl { uint64 generation; /* generation is needed to handle correct hash @@ -137,6 +148,7 @@ typedef struct FileCacheControl uint32 size; /* size of cache file in chunks */ uint32 used; /* number of used chunks */ uint32 used_pages; /* number of used pages */ + uint32 pinned; /* number of pinned chunks */ uint32 limit; /* shared copy of lfc_size_limit */ uint64 hits; uint64 misses; @@ -145,32 +157,37 @@ typedef struct FileCacheControl uint64 time_write; /* time spent writing (us) */ uint64 resizes; /* number of LFC resizes */ uint64 evicted_pages; /* number of evicted pages */ - uint32 prewarm_total_chunks; - uint32 prewarm_curr_chunk; - uint32 prewarmed_pages; - uint32 skipped_pages; dlist_head lru; /* double linked list for LRU replacement * algorithm */ dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ + PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; } FileCacheControl; -bool lfc_store_prefetch_result; +#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc -typedef struct FileCacheStateEntry +typedef struct FileCacheState { - BufferTag key; - uint32 bitmap[CHUNK_BITMAP_SIZE]; -} FileCacheStateEntry; + uint32 magic; + uint32 chunk_size_log; + size_t n_chunks; + BufferTag chunks[FLEXIBLE_ARRAY_MEMBER]; + /* followed by bitmap */ +} FileCacheState; -static HTAB *lfc_hash; +#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks]) +#define FILE_CACHE_STATE_SIZE(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + ((n_chunks)+7)/8) + +static HTAB *lfc_hash; static int lfc_desc = -1; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; static int lfc_prewarm_limit; static int lfc_prewarm_batch; +static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG; +static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK; static char *lfc_path; static uint64 lfc_generation; static FileCacheControl *lfc_ctl; @@ -179,6 +196,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook; static shmem_request_hook_type prev_shmem_request_hook; #endif +bool lfc_store_prefetch_result; + #define LFC_ENABLED() (lfc_ctl->limit != 0) /* @@ -219,7 +238,9 @@ lfc_switch_off(void) } lfc_ctl->generation += 1; lfc_ctl->size = 0; + lfc_ctl->pinned = 0; lfc_ctl->used = 0; + lfc_ctl->used_pages = 0; lfc_ctl->limit = 0; dlist_init(&lfc_ctl->lru); dlist_init(&lfc_ctl->holes); @@ -383,6 +404,12 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source) return true; } +static void +lfc_change_chunk_size(int newval, void* extra) +{ + lfc_blocks_per_chunk = 1 << newval; +} + static void lfc_change_limit_hook(int newval, void *extra) { @@ -428,11 +455,11 @@ lfc_change_limit_hook(int newval, void *extra) CriticalAssert(victim->access_count == 0); #ifdef FALLOC_FL_PUNCH_HOLE - if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0) + if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * lfc_blocks_per_chunk * BLCKSZ, lfc_blocks_per_chunk * BLCKSZ) < 0) neon_log(LOG, "Failed to punch hole in file: %m"); #endif /* We remove the old entry, and re-enter a hole to the hash table */ - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + for (int i = 0; i < lfc_blocks_per_chunk; i++) { bool is_page_cached = GET_STATE(victim, i) == AVAILABLE; lfc_ctl->used_pages -= is_page_cached; @@ -525,7 +552,7 @@ lfc_init(void) "Maximal number of prewarmed pages", NULL, &lfc_prewarm_limit, - 0, /* disabled by default */ + INT_MAX, /* no limit by default */ 0, INT_MAX, PGC_SIGHUP, @@ -547,6 +574,19 @@ lfc_init(void) NULL, NULL); + DefineCustomIntVariable("neon.chunk_size_log", + "Logarithm of LFC chunk size in blocks", + NULL, + &lfc_chunk_size_log, + MAX_BLOCKS_PER_CHUNK_LOG, + 0, + MAX_BLOCKS_PER_CHUNK_LOG, + PGC_POSTMASTER, + 0, + NULL, + lfc_change_chunk_size, + NULL); + if (lfc_max_size == 0) return; @@ -560,42 +600,48 @@ lfc_init(void) #endif } -static FileCacheStateEntry* -lfc_get_state(size_t* n_entries) +static FileCacheState* +lfc_get_state(size_t max_entries) { - size_t max_entries = *n_entries; - size_t i = 0; - FileCacheStateEntry* fs; + FileCacheState* fcs = NULL; if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */ return NULL; - fs = (FileCacheStateEntry*)palloc0(sizeof(FileCacheStateEntry) * max_entries); - LWLockAcquire(lfc_lock, LW_SHARED); if (LFC_ENABLED()) { - dlist_iter iter; + dlist_iter iter; + size_t i = 0; + uint8* bitmap; + size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned); + + fcs = (FileCacheState*)palloc0(FILE_CACHE_STATE_SIZE(n_entries)); + fcs->magic = FILE_CACHE_STATE_MAGIC; + fcs->chunk_size_log = lfc_chunk_size_log; + fcs->n_chunks = n_entries; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + dlist_reverse_foreach(iter, &lfc_ctl->lru) { FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur); - memcpy(&fs[i].key, &entry->key, sizeof entry->key); - for (int j = 0; j < BLOCKS_PER_CHUNK; j++) + fcs->chunks[i] = entry->key; + for (int j = 0; j < lfc_blocks_per_chunk; j++) { if (GET_STATE(entry, j) != UNAVAILABLE) - fs[i].bitmap[j >> 5] |= (uint32)1 << (j & 31); + BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j); } - if (++i == max_entries) + if (++i == n_entries) break; } - elog(LOG, "LFC: save state of %ld chunks", (long)i); + Assert(i == n_entries); + elog(LOG, "LFC: save state of %ld chunks", (long)n_entries); } LWLockRelease(lfc_lock); - *n_entries = i; - return fs; + return fcs; } /* @@ -603,46 +649,80 @@ lfc_get_state(size_t* n_entries) * and avoid race conditions with other backends. */ static void -lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) +lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) { size_t snd_idx = 0, rcv_idx = 0; size_t n_sent = 0, n_received = 0; + size_t fcs_chunk_size_log; + size_t n_entries; + size_t max_prefetch_pages; size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size); bool save_lfc_store_prefetch_result; + size_t vacant_blocks; + PrewarmWorkerState* ws; + uint8* bitmap; + BufferTag tag; if (!lfc_ensure_opened()) return; - if (n_entries == 0 || fs == NULL) + if (prewarm_batch == 0 || n_workers == 0) { elog(LOG, "LFC: prewarm is disabled"); return; } + if (worker_id >= MAX_PREWARM_WORKERS || worker_id >= n_workers) + { + elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id); + } + ws = &lfc_ctl->prewarm_workers[worker_id]; + + if (fcs == NULL || fcs->n_chunks == 0) + { + elog(LOG, "LFC: nothing to prewarm"); + return; + } + + if (fcs->magic != FILE_CACHE_STATE_MAGIC) + { + elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic); + } + + fcs_chunk_size_log = fcs->chunk_size_log; + if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK) + { + elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log); + } + n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); /* Do not prewarm more entries than LFC limit */ if (lfc_ctl->limit <= lfc_ctl->size) { - elog(LOG, "LFC: skip prewarm because LFC limit is smaller than prewarm size"); + elog(LOG, "LFC: skip prewarm because LFC is already filled"); LWLockRelease(lfc_lock); return; } - - if (n_entries > lfc_ctl->limit - lfc_ctl->size) + vacant_blocks = (size_t)(lfc_ctl->limit - lfc_ctl->size) << lfc_chunk_size_log; + if (n_entries > (vacant_blocks >> fcs_chunk_size_log)) { - n_entries = lfc_ctl->limit - lfc_ctl->size; + n_entries = vacant_blocks >> fcs_chunk_size_log; } + max_prefetch_pages = n_entries << fcs_chunk_size_log; - if (lfc_ctl->prewarm_total_chunks != lfc_ctl->prewarm_curr_chunk) + if (ws->total_chunks != ws->curr_chunk) { - elog(LOG, "LFC: skip prewarm because concurrent prewarm is detected"); LWLockRelease(lfc_lock); - return; + elog(ERROR, "LFC: skip prewarm because prewarm worker %d is still active", worker_id); } /* Initialize fields used to track prewarming progress */ - lfc_ctl->prewarm_total_chunks = n_entries; - lfc_ctl->prewarm_curr_chunk = 0; + ws->total_chunks = n_entries; + ws->curr_chunk = 0; + ws->prewarmed_pages = 0; + ws->skipped_pages = 0; LWLockRelease(lfc_lock); @@ -654,43 +734,54 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) while (true) { - BufferTag tag; - size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK; - BlockNumber offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK; - if (chunk_no < n_entries) + if (snd_idx < max_prefetch_pages) { - if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))) + if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) { - tag = fs[chunk_no].key; - tag.blockNum += offs_in_chunk; - (void)prefetch_register_bufferv(tag, NULL, 1, NULL, true); - n_sent += 1; - } - snd_idx += 1; - } - if (n_sent >= n_received + prewarm_batch || chunk_no == n_entries) - { - do - { - chunk_no = rcv_idx / BLOCKS_PER_CHUNK; - offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK; - rcv_idx += 1; - } while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))); - - lfc_ctl->prewarm_curr_chunk = chunk_no; - - tag = fs[chunk_no].key; - tag.blockNum += offs_in_chunk; - if (prefetch_receive(tag)) - { - lfc_ctl->prewarmed_pages += 1; + /* If there are multiple workers, split chunks between them */ + snd_idx += 1 << fcs_chunk_size_log; } else { - lfc_ctl->skipped_pages += 1; + if (BITMAP_ISSET(bitmap, snd_idx)) + { + tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; + tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); + (void)prefetch_register_bufferv(tag, NULL, 1, NULL, true); + n_sent += 1; + } + snd_idx += 1; + } + } + if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages) + { + if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* Skip chunks processed by other workers */ + rcv_idx += 1 << fcs_chunk_size_log; + continue; } - if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK) + /* Locate next block to prefetch */ + while (!BITMAP_ISSET(bitmap, rcv_idx)) + { + rcv_idx += 1; + } + /* Update progress indicator */ + ws->curr_chunk = rcv_idx >> fcs_chunk_size_log; + + tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; + tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); + if (prefetch_receive(tag)) + { + ws->prewarmed_pages += 1; + } + else + { + ws->skipped_pages += 1; + } + rcv_idx += 1; + if (++n_received == n_sent && snd_idx == max_prefetch_pages) { break; } @@ -698,8 +789,8 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) } Assert(n_sent == n_received); lfc_store_prefetch_result = save_lfc_store_prefetch_result; - lfc_ctl->prewarm_curr_chunk = n_entries; - elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received); + ws->curr_chunk = n_entries; + elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); } @@ -712,7 +803,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) { BufferTag tag; FileCacheEntry *entry; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); bool found = false; uint32 hash; @@ -721,7 +812,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; - tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + tag.blockNum = blkno - chunk_offs; CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); @@ -759,9 +850,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); + tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); - chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); LWLockAcquire(lfc_lock, LW_SHARED); @@ -772,12 +863,12 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } while (true) { - int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs); + int this_chunk = Min(nblocks - i, lfc_blocks_per_chunk - chunk_offs); entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); if (entry != NULL) { - for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++) + for (; chunk_offs < lfc_blocks_per_chunk && i < nblocks; chunk_offs++, i++) { if (GET_STATE(entry, chunk_offs) != UNAVAILABLE) { @@ -801,9 +892,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * Prepare for the next iteration. We don't unlock here, as that'd * probably be more expensive than the gains it'd get us. */ - tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1); + chunk_offs = BLOCK_TO_CHUNK_OFF(blkno + i); + tag.blockNum = (blkno + i) - chunk_offs; hash = get_hash_value(lfc_hash, &tag); - chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1); } LWLockRelease(lfc_lock); @@ -878,9 +969,9 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, while (nblocks > 0) { struct iovec iov[PG_IOV_MAX]; - int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0}; - int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1)); - int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK)); + uint8 chunk_mask[MAX_BLOCKS_PER_CHUNK / 8] = {0}; + int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); + int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs); int iteration_hits = 0; int iteration_misses = 0; uint64 io_time_us = 0; @@ -968,8 +1059,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, /* Unlink entry from LRU list to pin it for the duration of IO operation */ if (entry->access_count++ == 0) + { + lfc_ctl->pinned += 1; dlist_delete(&entry->list_node); - + } generation = lfc_ctl->generation; entry_offset = entry->offset; @@ -1018,7 +1111,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (iteration_hits != 0) { /* chunk offset (# of pages) into the LFC file */ - off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK; + off_t first_read_offset = (off_t) entry_offset * lfc_blocks_per_chunk; int nwrite = iov_last_used - first_block_in_chunk_read; /* offset of first IOV */ first_read_offset += chunk_offs + first_block_in_chunk_read; @@ -1066,7 +1159,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(entry->access_count > 0); if (--entry->access_count == 0) + { + lfc_ctl->pinned -= 1; dlist_push_tail(&lfc_ctl->lru, &entry->list_node); + } } else { @@ -1143,7 +1239,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru)); - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + for (int i = 0; i < lfc_blocks_per_chunk; i++) { bool is_page_cached = GET_STATE(victim, i) == AVAILABLE; lfc_ctl->used_pages -= is_page_cached; @@ -1167,8 +1263,9 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) entry->access_count = 1; entry->hash = hash; + lfc_ctl->pinned += 1; - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + for (int i = 0; i < lfc_blocks_per_chunk; i++) SET_STATE(entry, i, UNAVAILABLE); return true; @@ -1213,7 +1310,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, FileCacheBlockState state; XLogRecPtr lwlsn; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; @@ -1223,7 +1320,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; @@ -1263,7 +1360,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, * operation */ if (entry->access_count++ == 0) + { + lfc_ctl->pinned += 1; dlist_delete(&entry->list_node); + } } else { @@ -1288,7 +1388,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); INSTR_TIME_SET_CURRENT(io_start); rc = pwrite(lfc_desc, buffer, BLCKSZ, - ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); + ((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ); INSTR_TIME_SET_CURRENT(io_end); pgstat_report_wait_end(); @@ -1314,7 +1414,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, inc_page_cache_write_wait(time_spent_us); if (--entry->access_count == 0) + { + lfc_ctl->pinned -= 1; dlist_push_tail(&lfc_ctl->lru, &entry->list_node); + } state = GET_STATE(entry, chunk_offs); if (state == REQUESTED) { @@ -1381,8 +1484,8 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, while (nblocks > 0) { struct iovec iov[PG_IOV_MAX]; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); - int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK)); + int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); + int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs); instr_time io_start, io_end; ConditionVariable* cv; @@ -1394,7 +1497,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, iov[i].iov_len = BLCKSZ; } - tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; @@ -1414,7 +1517,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * operation */ if (entry->access_count++ == 0) + { + lfc_ctl->pinned += 1; dlist_delete(&entry->list_node); + } } else { @@ -1467,7 +1573,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); INSTR_TIME_SET_CURRENT(io_start); rc = pwritev(lfc_desc, iov, blocks_in_chunk, - ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); + ((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ); INSTR_TIME_SET_CURRENT(io_end); pgstat_report_wait_end(); @@ -1494,7 +1600,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, inc_page_cache_write_wait(time_spent_us); if (--entry->access_count == 0) + { + lfc_ctl->pinned -= 1; dlist_push_tail(&lfc_ctl->lru, &entry->list_node); + } for (int i = 0; i < blocks_in_chunk; i++) { @@ -1620,7 +1729,12 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS) break; case 8: key = "file_cache_chunk_size_pages"; - value = BLOCKS_PER_CHUNK; + value = lfc_blocks_per_chunk; + break; + case 9: + key = "file_cache_pinned"; + if (lfc_ctl) + value = lfc_ctl->pinned; break; default: SRF_RETURN_DONE(funcctx); @@ -1748,7 +1862,7 @@ local_cache_pages(PG_FUNCTION_ARGS) /* Skip hole tags */ if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + for (int i = 0; i < lfc_blocks_per_chunk; i++) n_pages += GET_STATE(entry, i) == AVAILABLE; } } @@ -1776,13 +1890,13 @@ local_cache_pages(PG_FUNCTION_ARGS) hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + for (int i = 0; i < lfc_blocks_per_chunk; i++) { if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0) { if (GET_STATE(entry, i) == AVAILABLE) { - fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i; + fctx->record[n].pageoffs = entry->offset * lfc_blocks_per_chunk + i; fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); @@ -1872,16 +1986,16 @@ PG_FUNCTION_INFO_V1(get_local_cache_state); Datum get_local_cache_state(PG_FUNCTION_ARGS) { - size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); - FileCacheStateEntry* fs = lfc_get_state(&n_entries); - if (fs != NULL) + size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheState* fcs = lfc_get_state(max_entries); + if (fcs != NULL) { - size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries; + size_t size_in_bytes = FILE_CACHE_STATE_SIZE(fcs->n_chunks); bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes); SET_VARSIZE(res, VARHDRSZ + size_in_bytes); - memcpy(VARDATA(res), fs, size_in_bytes); - pfree(fs); + memcpy(VARDATA(res), fcs, size_in_bytes); + pfree(fcs); PG_RETURN_BYTEA_P(res); } @@ -1894,10 +2008,14 @@ Datum prewarm_local_cache(PG_FUNCTION_ARGS) { bytea* state = PG_GETARG_BYTEA_PP(0); - uint32 n_entries = VARSIZE_ANY_EXHDR(state)/sizeof(FileCacheStateEntry); - FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state); + uint32 worker_id = PG_GETARG_INT32(1); + uint32 n_workers = PG_GETARG_INT32(2); + FileCacheState* fcs = (FileCacheState*)VARDATA_ANY(state); - lfc_prewarm(fs, n_entries); + if (FILE_CACHE_STATE_SIZE(fcs->n_chunks) != VARSIZE_ANY_EXHDR(state)) + elog(ERROR, "LFC: Invalid file cache state size"); + + lfc_prewarm(fcs, worker_id, n_workers); PG_RETURN_NULL(); } @@ -1910,10 +2028,17 @@ get_prewarm_info(PG_FUNCTION_ARGS) Datum values[4]; bool nulls[4]; TupleDesc tupdesc; + uint32 worker_id = PG_GETARG_INT32(0); + PrewarmWorkerState* ws; if (lfc_size_limit == 0) PG_RETURN_NULL(); + if (worker_id >= MAX_PREWARM_WORKERS) + { + elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id); + } + tupdesc = CreateTemplateTupleDesc(4); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0); @@ -1923,10 +2048,13 @@ get_prewarm_info(PG_FUNCTION_ARGS) MemSet(nulls, 0, sizeof(nulls)); LWLockAcquire(lfc_lock, LW_SHARED); - values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks); - values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk); - values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages); - values[3] = Int32GetDatum(lfc_ctl->skipped_pages); + + ws = &lfc_ctl->prewarm_workers[worker_id]; + + values[0] = Int32GetDatum(ws->total_chunks); + values[1] = Int32GetDatum(ws->curr_chunk); + values[2] = Int32GetDatum(ws->prewarmed_pages); + values[3] = Int32GetDatum(ws->skipped_pages); LWLockRelease(lfc_lock); PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 42f3ef673a..ccb072d6f9 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -166,6 +166,9 @@ typedef struct WaitEventSet *wes_read; } PageServer; +static uint32 local_request_counter; +#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter) + static PageServer page_servers[MAX_SHARDS]; static bool pageserver_flush(shardno_t shard_no); diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql index c2f3895883..f44b1ef76b 100644 --- a/pgxn/neon/neon--1.5--1.6.sql +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -1,6 +1,6 @@ \echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit -CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer) +CREATE FUNCTION get_prewarm_info(worker_id integer default 0, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer) RETURNS record AS 'MODULE_PATHNAME', 'get_prewarm_info' LANGUAGE C STRICT @@ -12,7 +12,7 @@ AS 'MODULE_PATHNAME', 'get_local_cache_state' LANGUAGE C PARALLEL UNSAFE; -CREATE FUNCTION prewarm_local_cache(state bytea) +CREATE FUNCTION prewarm_local_cache(state bytea, worker_id integer default 0, n_workers integer default 1) RETURNS void AS 'MODULE_PATHNAME', 'prewarm_local_cache' LANGUAGE C STRICT diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql index 0ff29933b8..db7b524c57 100644 --- a/pgxn/neon/neon--1.6--1.5.sql +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -1,7 +1,7 @@ -DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer); +DROP FUNCTION IF EXISTS get_prewarm_info(worker_id integer, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer); DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); -DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea); +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, worker_id integer, n_workers integer);