diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 838afdc804..ead2cc42f5 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -123,10 +123,11 @@ typedef struct FileCacheEntry uint32 hash; uint32 offset; uint32 access_count; - uint32 state[CHUNK_BITMAP_SIZE * 2]; /* two bits per block */ dlist_node list_node; /* LRU/holes list node */ + uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */ } FileCacheEntry; +#define FILE_CACHE_ENRTY_SIZE MAXALIGN(offsetof(FileCacheEntry, state) + (lfc_blocks_per_chunk+31)/32) #define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3) #define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2)) @@ -171,6 +172,7 @@ typedef struct FileCacheControl typedef struct FileCacheState { + int32 vl_len_; /* varlena header (do not touch directly!) */ uint32 magic; uint32 n_chunks; uint16 chunk_size_log; @@ -179,7 +181,8 @@ typedef struct FileCacheState } FileCacheState; #define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks]) -#define FILE_CACHE_STATE_SIZE(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8) +#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8) +#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8) static HTAB *lfc_hash; static int lfc_desc = -1; @@ -332,7 +335,7 @@ lfc_shmem_startup(void) lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock"); info.keysize = sizeof(BufferTag); - info.entrysize = sizeof(FileCacheEntry); + info.entrysize = FILE_CACHE_ENRTY_SIZE; /* * n_chunks+1 because we add new element to hash table before eviction @@ -378,7 +381,7 @@ lfc_shmem_request(void) prev_shmem_request_hook(); #endif - RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry))); + RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE)); RequestNamedLWLockTranche("lfc_lock", 1); } @@ -618,8 +621,9 @@ lfc_get_state(size_t max_entries) size_t i = 0; uint8* bitmap; size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned); - - fcs = (FileCacheState*)palloc0(FILE_CACHE_STATE_SIZE(n_entries)); + size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries); + fcs = (FileCacheState*)palloc0(state_size); + SET_VARSIZE(fcs, state_size); fcs->magic = FILE_CACHE_STATE_MAGIC; fcs->chunk_size_log = lfc_chunk_size_log; fcs->n_chunks = n_entries; @@ -691,6 +695,11 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic); } + if (FILE_CACHE_STATE_SIZE(fcs) != VARSIZE(fcs)) + { + elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs)); + } + fcs_chunk_size_log = fcs->chunk_size_log; if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK) { @@ -733,66 +742,79 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) lfc_store_prefetch_result = true; elog(LOG, "LFC: start loading %ld chunks", (long)n_entries); - - while (true) + PG_TRY(); { - if (snd_idx < max_prefetch_pages) + while (true) { - if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) + if (snd_idx < max_prefetch_pages) { - /* If there are multiple workers, split chunks between them */ - snd_idx += 1 << fcs_chunk_size_log; - } - else - { - if (BITMAP_ISSET(bitmap, snd_idx)) + if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) { - tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; - tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); - (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); - n_sent += 1; + /* If there are multiple workers, split chunks between them */ + snd_idx += 1 << fcs_chunk_size_log; + } + else + { + if (BITMAP_ISSET(bitmap, snd_idx)) + { + tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; + tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); + if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum)) + { + (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); + n_sent += 1; + } + else + { + BITMAP_CLR(bitmap, snd_idx); + } + } + snd_idx += 1; } - snd_idx += 1; } - } - if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages) - { - if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) + if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages) { - /* Skip chunks processed by other workers */ - rcv_idx += 1 << fcs_chunk_size_log; - continue; - } + if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* Skip chunks processed by other workers */ + rcv_idx += 1 << fcs_chunk_size_log; + continue; + } - /* Locate next block to prefetch */ - while (!BITMAP_ISSET(bitmap, rcv_idx)) - { + /* Locate next block to prefetch */ + while (!BITMAP_ISSET(bitmap, rcv_idx)) + { + rcv_idx += 1; + } + /* Update progress indicator */ + ws->curr_chunk = rcv_idx >> fcs_chunk_size_log; + + tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; + tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); + if (communicator_prefetch_receive(tag)) + { + ws->prewarmed_pages += 1; + } + else + { + ws->skipped_pages += 1; + } rcv_idx += 1; - } - /* Update progress indicator */ - ws->curr_chunk = rcv_idx >> fcs_chunk_size_log; - - tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; - tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); - if (communicator_prefetch_receive(tag)) - { - ws->prewarmed_pages += 1; - } - else - { - ws->skipped_pages += 1; - } - rcv_idx += 1; - if (++n_received == n_sent && snd_idx == max_prefetch_pages) - { - break; + if (++n_received == n_sent && snd_idx == max_prefetch_pages) + { + break; + } } } + Assert(n_sent == n_received); + elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); } - Assert(n_sent == n_received); - lfc_store_prefetch_result = save_lfc_store_prefetch_result; - ws->curr_chunk = n_entries; - elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); + PG_FINALLY(); + { + lfc_store_prefetch_result = save_lfc_store_prefetch_result; + ws->curr_chunk = n_entries; + } + PG_END_TRY(); } @@ -1991,17 +2013,9 @@ get_local_cache_state(PG_FUNCTION_ARGS) size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); FileCacheState* fcs = lfc_get_state(max_entries); if (fcs != NULL) - { - size_t size_in_bytes = FILE_CACHE_STATE_SIZE(fcs->n_chunks); - bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes); - - SET_VARSIZE(res, VARHDRSZ + size_in_bytes); - memcpy(VARDATA(res), fcs, size_in_bytes); - pfree(fcs); - - PG_RETURN_BYTEA_P(res); - } - PG_RETURN_NULL(); + PG_RETURN_BYTEA_P((bytea*)fcs); + else + PG_RETURN_NULL(); } PG_FUNCTION_INFO_V1(prewarm_local_cache); @@ -2012,10 +2026,7 @@ prewarm_local_cache(PG_FUNCTION_ARGS) bytea* state = PG_GETARG_BYTEA_PP(0); uint32 worker_id = PG_GETARG_INT32(1); uint32 n_workers = PG_GETARG_INT32(2); - FileCacheState* fcs = (FileCacheState*)VARDATA_ANY(state); - - if (FILE_CACHE_STATE_SIZE(fcs->n_chunks) != VARSIZE_ANY_EXHDR(state)) - elog(ERROR, "LFC: Invalid file cache state size"); + FileCacheState* fcs = (FileCacheState*)state; lfc_prewarm(fcs, worker_id, n_workers);