diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index f4088ab264..22a454e422 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -423,11 +423,20 @@ communicator_new_get_lfc_state(size_t max_entries) struct FileCacheIterator iter; FileCacheState *fcs; uint8 *bitmap; + uint64_t num_pages_used; + size_t n_entries; + size_t state_size; + size_t n_pages; - /* TODO: Max(max_entries, ) */ - size_t n_entries = max_entries; - size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1); - size_t n_pages = 0; + /* + * Calculate the size of the returned state. + * + * FIXME: if the LFC is very large, this can exceed 1 GB, resulting in a + * palloc error. + */ + num_pages_used = bcomm_cache_get_num_pages_used(my_bs); + n_entries = Min(num_pages_used, max_entries); + state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1); fcs = (FileCacheState *) palloc0(state_size); SET_VARSIZE(fcs, state_size); @@ -437,6 +446,7 @@ communicator_new_get_lfc_state(size_t max_entries) bitmap = FILE_CACHE_STATE_BITMAP(fcs); bcomm_cache_iterate_begin(my_bs, &iter); + n_pages = 0; while (n_pages < max_entries && bcomm_cache_iterate_next(my_bs, &iter)) { BufferTag tag; @@ -455,6 +465,7 @@ communicator_new_get_lfc_state(size_t max_entries) BITMAP_SET(bitmap, i); } fcs->n_pages = n_pages; + fcs->n_chunks = n_pages; return fcs; } diff --git a/pgxn/neon/lfc_prewarm.c b/pgxn/neon/lfc_prewarm.c index 7a2ddb891d..5e3ce1e64e 100644 --- a/pgxn/neon/lfc_prewarm.c +++ b/pgxn/neon/lfc_prewarm.c @@ -440,121 +440,131 @@ lfc_prewarm_with_async_requests(FileCacheState *fcs) n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); Assert(n_entries != 0); - LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); - - /* Do not prewarm more entries than LFC limit */ - /* FIXME */ -#if 0 - if (prewarm_ctl->limit <= prewarm_ctl->size) + PG_TRY(); { - elog(LOG, "LFC: skip prewarm because LFC is already filled"); - LWLockRelease(prewarm_lock); - return; - } + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + /* FIXME */ +#if 0 + if (prewarm_ctl->limit <= prewarm_ctl->size) + { + elog(LOG, "LFC: skip prewarm because LFC is already filled"); + LWLockRelease(prewarm_lock); + return; + } #endif - if (prewarm_ctl->prewarm_active) - { - LWLockRelease(prewarm_lock); - elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); - } - prewarm_ctl->n_prewarm_entries = n_entries; - prewarm_ctl->n_prewarm_workers = -1; - prewarm_ctl->prewarm_active = true; - prewarm_ctl->prewarm_canceled = false; - - /* Calculate total number of pages to be prewarmed */ - prewarm_ctl->total_prewarm_pages = fcs->n_pages; - - LWLockRelease(prewarm_lock); - - elog(LOG, "LFC: start prewarming"); - lfc_do_prewarm = true; - lfc_prewarm_cancel = false; - - bitmap = FILE_CACHE_STATE_BITMAP(fcs); - - blocks_per_chunk = 1 << fcs->chunk_size_log; - - bitno = 0; - for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++) - { - BufferTag *chunk_tag = &fcs->chunks[chunkno]; - BlockNumber request_startblkno = InvalidBlockNumber; - BlockNumber request_endblkno; - - if (!BufferTagIsValid(chunk_tag)) - elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum); - - if (lfc_prewarm_cancel) + if (prewarm_ctl->prewarm_active) { - prewarm_ctl->prewarm_canceled = true; - break; + LWLockRelease(prewarm_lock); + elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); } + prewarm_ctl->n_prewarm_entries = n_entries; + prewarm_ctl->n_prewarm_workers = -1; + prewarm_ctl->prewarm_active = true; + prewarm_ctl->prewarm_canceled = false; - /* take next chunk */ - for (int j = 0; j < blocks_per_chunk; j++) + /* Calculate total number of pages to be prewarmed */ + prewarm_ctl->total_prewarm_pages = fcs->n_pages; + + LWLockRelease(prewarm_lock); + + elog(LOG, "LFC: start prewarming"); + lfc_do_prewarm = true; + lfc_prewarm_cancel = false; + + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + blocks_per_chunk = 1 << fcs->chunk_size_log; + + bitno = 0; + for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++) { - BlockNumber blkno = chunk_tag->blockNum + j; + BufferTag *chunk_tag = &fcs->chunks[chunkno]; + BlockNumber request_startblkno = InvalidBlockNumber; + BlockNumber request_endblkno; - if (BITMAP_ISSET(bitmap, bitno)) + if (!BufferTagIsValid(chunk_tag)) + elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum); + + if (lfc_prewarm_cancel) { - if (request_startblkno != InvalidBlockNumber) + prewarm_ctl->prewarm_canceled = true; + break; + } + + /* take next chunk */ + for (int j = 0; j < blocks_per_chunk; j++) + { + BlockNumber blkno = chunk_tag->blockNum + j; + + if (BITMAP_ISSET(bitmap, bitno)) { - if (request_endblkno == blkno) + if (request_startblkno != InvalidBlockNumber) { - /* append this block to the request */ - request_endblkno++; + if (request_endblkno == blkno) + { + /* append this block to the request */ + request_endblkno++; + } + else + { + /* flush this request, and start new one */ + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + request_startblkno = blkno; + request_endblkno = blkno + 1; + } } else { - /* flush this request, and start new one */ - communicator_new_prefetch_register_bufferv( - BufTagGetNRelFileInfo(*chunk_tag), - chunk_tag->forkNum, - request_startblkno, - request_endblkno - request_startblkno - ); + /* flush this request, if any, and start new one */ + if (request_startblkno != InvalidBlockNumber) + { + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + } request_startblkno = blkno; request_endblkno = blkno + 1; } + prewarm_ctl->prewarmed_pages += 1; } - else - { - /* flush this request, if any, and start new one */ - if (request_startblkno != InvalidBlockNumber) - { - communicator_new_prefetch_register_bufferv( - BufTagGetNRelFileInfo(*chunk_tag), - chunk_tag->forkNum, - request_startblkno, - request_endblkno - request_startblkno - ); - } - request_startblkno = blkno; - request_endblkno = blkno + 1; - } - prewarm_ctl->prewarmed_pages += 1; + bitno++; } - bitno++; + + /* flush this request */ + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + request_startblkno = request_endblkno = InvalidBlockNumber; } - /* flush this request */ - communicator_new_prefetch_register_bufferv( - BufTagGetNRelFileInfo(*chunk_tag), - chunk_tag->forkNum, - request_startblkno, - request_endblkno - request_startblkno - ); - request_startblkno = request_endblkno = InvalidBlockNumber; + elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages); + prewarm_ctl->completed = GetCurrentTimestamp(); + + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + prewarm_ctl->prewarm_active = false; + LWLockRelease(prewarm_lock); } - - elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages); - prewarm_ctl->completed = GetCurrentTimestamp(); - - LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); - prewarm_ctl->prewarm_active = false; - LWLockRelease(prewarm_lock); + PG_CATCH(); + { + elog(LOG, "LFC: cancel prewarm"); + prewarm_ctl->prewarm_canceled = true; + prewarm_ctl->prewarm_active = false; + } + PG_END_TRY(); } PG_FUNCTION_INFO_V1(get_local_cache_state);