Fix LFC prewarm cancellation

This commit is contained in:
Heikki Linnakangas
2025-08-01 00:11:50 +03:00
parent 26bd994852
commit 4a031b9467
2 changed files with 118 additions and 97 deletions

View File

@@ -423,11 +423,20 @@ communicator_new_get_lfc_state(size_t max_entries)
struct FileCacheIterator iter; struct FileCacheIterator iter;
FileCacheState *fcs; FileCacheState *fcs;
uint8 *bitmap; uint8 *bitmap;
uint64_t num_pages_used;
size_t n_entries;
size_t state_size;
size_t n_pages;
/* TODO: Max(max_entries, <current # of entries in cache>) */ /*
size_t n_entries = max_entries; * Calculate the size of the returned state.
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1); *
size_t n_pages = 0; * FIXME: if the LFC is very large, this can exceed 1 GB, resulting in a
* palloc error.
*/
num_pages_used = bcomm_cache_get_num_pages_used(my_bs);
n_entries = Min(num_pages_used, max_entries);
state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1);
fcs = (FileCacheState *) palloc0(state_size); fcs = (FileCacheState *) palloc0(state_size);
SET_VARSIZE(fcs, state_size); SET_VARSIZE(fcs, state_size);
@@ -437,6 +446,7 @@ communicator_new_get_lfc_state(size_t max_entries)
bitmap = FILE_CACHE_STATE_BITMAP(fcs); bitmap = FILE_CACHE_STATE_BITMAP(fcs);
bcomm_cache_iterate_begin(my_bs, &iter); bcomm_cache_iterate_begin(my_bs, &iter);
n_pages = 0;
while (n_pages < max_entries && bcomm_cache_iterate_next(my_bs, &iter)) while (n_pages < max_entries && bcomm_cache_iterate_next(my_bs, &iter))
{ {
BufferTag tag; BufferTag tag;
@@ -455,6 +465,7 @@ communicator_new_get_lfc_state(size_t max_entries)
BITMAP_SET(bitmap, i); BITMAP_SET(bitmap, i);
} }
fcs->n_pages = n_pages; fcs->n_pages = n_pages;
fcs->n_chunks = n_pages;
return fcs; return fcs;
} }

View File

@@ -440,121 +440,131 @@ lfc_prewarm_with_async_requests(FileCacheState *fcs)
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
Assert(n_entries != 0); Assert(n_entries != 0);
LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); PG_TRY();
/* Do not prewarm more entries than LFC limit */
/* FIXME */
#if 0
if (prewarm_ctl->limit <= prewarm_ctl->size)
{ {
elog(LOG, "LFC: skip prewarm because LFC is already filled"); LWLockAcquire(prewarm_lock, LW_EXCLUSIVE);
LWLockRelease(prewarm_lock);
return; /* Do not prewarm more entries than LFC limit */
} /* FIXME */
#if 0
if (prewarm_ctl->limit <= prewarm_ctl->size)
{
elog(LOG, "LFC: skip prewarm because LFC is already filled");
LWLockRelease(prewarm_lock);
return;
}
#endif #endif
if (prewarm_ctl->prewarm_active) if (prewarm_ctl->prewarm_active)
{
LWLockRelease(prewarm_lock);
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
}
prewarm_ctl->n_prewarm_entries = n_entries;
prewarm_ctl->n_prewarm_workers = -1;
prewarm_ctl->prewarm_active = true;
prewarm_ctl->prewarm_canceled = false;
/* Calculate total number of pages to be prewarmed */
prewarm_ctl->total_prewarm_pages = fcs->n_pages;
LWLockRelease(prewarm_lock);
elog(LOG, "LFC: start prewarming");
lfc_do_prewarm = true;
lfc_prewarm_cancel = false;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
blocks_per_chunk = 1 << fcs->chunk_size_log;
bitno = 0;
for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++)
{
BufferTag *chunk_tag = &fcs->chunks[chunkno];
BlockNumber request_startblkno = InvalidBlockNumber;
BlockNumber request_endblkno;
if (!BufferTagIsValid(chunk_tag))
elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum);
if (lfc_prewarm_cancel)
{ {
prewarm_ctl->prewarm_canceled = true; LWLockRelease(prewarm_lock);
break; elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
} }
prewarm_ctl->n_prewarm_entries = n_entries;
prewarm_ctl->n_prewarm_workers = -1;
prewarm_ctl->prewarm_active = true;
prewarm_ctl->prewarm_canceled = false;
/* take next chunk */ /* Calculate total number of pages to be prewarmed */
for (int j = 0; j < blocks_per_chunk; j++) prewarm_ctl->total_prewarm_pages = fcs->n_pages;
LWLockRelease(prewarm_lock);
elog(LOG, "LFC: start prewarming");
lfc_do_prewarm = true;
lfc_prewarm_cancel = false;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
blocks_per_chunk = 1 << fcs->chunk_size_log;
bitno = 0;
for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++)
{ {
BlockNumber blkno = chunk_tag->blockNum + j; BufferTag *chunk_tag = &fcs->chunks[chunkno];
BlockNumber request_startblkno = InvalidBlockNumber;
BlockNumber request_endblkno;
if (BITMAP_ISSET(bitmap, bitno)) if (!BufferTagIsValid(chunk_tag))
elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum);
if (lfc_prewarm_cancel)
{ {
if (request_startblkno != InvalidBlockNumber) prewarm_ctl->prewarm_canceled = true;
break;
}
/* take next chunk */
for (int j = 0; j < blocks_per_chunk; j++)
{
BlockNumber blkno = chunk_tag->blockNum + j;
if (BITMAP_ISSET(bitmap, bitno))
{ {
if (request_endblkno == blkno) if (request_startblkno != InvalidBlockNumber)
{ {
/* append this block to the request */ if (request_endblkno == blkno)
request_endblkno++; {
/* append this block to the request */
request_endblkno++;
}
else
{
/* flush this request, and start new one */
communicator_new_prefetch_register_bufferv(
BufTagGetNRelFileInfo(*chunk_tag),
chunk_tag->forkNum,
request_startblkno,
request_endblkno - request_startblkno
);
request_startblkno = blkno;
request_endblkno = blkno + 1;
}
} }
else else
{ {
/* flush this request, and start new one */ /* flush this request, if any, and start new one */
communicator_new_prefetch_register_bufferv( if (request_startblkno != InvalidBlockNumber)
BufTagGetNRelFileInfo(*chunk_tag), {
chunk_tag->forkNum, communicator_new_prefetch_register_bufferv(
request_startblkno, BufTagGetNRelFileInfo(*chunk_tag),
request_endblkno - request_startblkno chunk_tag->forkNum,
); request_startblkno,
request_endblkno - request_startblkno
);
}
request_startblkno = blkno; request_startblkno = blkno;
request_endblkno = blkno + 1; request_endblkno = blkno + 1;
} }
prewarm_ctl->prewarmed_pages += 1;
} }
else bitno++;
{
/* flush this request, if any, and start new one */
if (request_startblkno != InvalidBlockNumber)
{
communicator_new_prefetch_register_bufferv(
BufTagGetNRelFileInfo(*chunk_tag),
chunk_tag->forkNum,
request_startblkno,
request_endblkno - request_startblkno
);
}
request_startblkno = blkno;
request_endblkno = blkno + 1;
}
prewarm_ctl->prewarmed_pages += 1;
} }
bitno++;
/* flush this request */
communicator_new_prefetch_register_bufferv(
BufTagGetNRelFileInfo(*chunk_tag),
chunk_tag->forkNum,
request_startblkno,
request_endblkno - request_startblkno
);
request_startblkno = request_endblkno = InvalidBlockNumber;
} }
/* flush this request */ elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages);
communicator_new_prefetch_register_bufferv( prewarm_ctl->completed = GetCurrentTimestamp();
BufTagGetNRelFileInfo(*chunk_tag),
chunk_tag->forkNum, LWLockAcquire(prewarm_lock, LW_EXCLUSIVE);
request_startblkno, prewarm_ctl->prewarm_active = false;
request_endblkno - request_startblkno LWLockRelease(prewarm_lock);
);
request_startblkno = request_endblkno = InvalidBlockNumber;
} }
PG_CATCH();
elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages); {
prewarm_ctl->completed = GetCurrentTimestamp(); elog(LOG, "LFC: cancel prewarm");
prewarm_ctl->prewarm_canceled = true;
LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); prewarm_ctl->prewarm_active = false;
prewarm_ctl->prewarm_active = false; }
LWLockRelease(prewarm_lock); PG_END_TRY();
} }
PG_FUNCTION_INFO_V1(get_local_cache_state); PG_FUNCTION_INFO_V1(get_local_cache_state);