mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Address review comments
This commit is contained in:
@@ -123,10 +123,11 @@ typedef struct FileCacheEntry
|
||||
uint32 hash;
|
||||
uint32 offset;
|
||||
uint32 access_count;
|
||||
uint32 state[CHUNK_BITMAP_SIZE * 2]; /* two bits per block */
|
||||
dlist_node list_node; /* LRU/holes list node */
|
||||
uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */
|
||||
} FileCacheEntry;
|
||||
|
||||
#define FILE_CACHE_ENRTY_SIZE MAXALIGN(offsetof(FileCacheEntry, state) + (lfc_blocks_per_chunk+31)/32)
|
||||
#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3)
|
||||
#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2))
|
||||
|
||||
@@ -171,6 +172,7 @@ typedef struct FileCacheControl
|
||||
|
||||
typedef struct FileCacheState
|
||||
{
|
||||
int32 vl_len_; /* varlena header (do not touch directly!) */
|
||||
uint32 magic;
|
||||
uint32 n_chunks;
|
||||
uint16 chunk_size_log;
|
||||
@@ -179,7 +181,8 @@ typedef struct FileCacheState
|
||||
} FileCacheState;
|
||||
|
||||
#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks])
|
||||
#define FILE_CACHE_STATE_SIZE(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8)
|
||||
#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8)
|
||||
#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8)
|
||||
|
||||
static HTAB *lfc_hash;
|
||||
static int lfc_desc = -1;
|
||||
@@ -332,7 +335,7 @@ lfc_shmem_startup(void)
|
||||
|
||||
lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock");
|
||||
info.keysize = sizeof(BufferTag);
|
||||
info.entrysize = sizeof(FileCacheEntry);
|
||||
info.entrysize = FILE_CACHE_ENRTY_SIZE;
|
||||
|
||||
/*
|
||||
* n_chunks+1 because we add new element to hash table before eviction
|
||||
@@ -378,7 +381,7 @@ lfc_shmem_request(void)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry)));
|
||||
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
}
|
||||
|
||||
@@ -618,8 +621,9 @@ lfc_get_state(size_t max_entries)
|
||||
size_t i = 0;
|
||||
uint8* bitmap;
|
||||
size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned);
|
||||
|
||||
fcs = (FileCacheState*)palloc0(FILE_CACHE_STATE_SIZE(n_entries));
|
||||
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries);
|
||||
fcs = (FileCacheState*)palloc0(state_size);
|
||||
SET_VARSIZE(fcs, state_size);
|
||||
fcs->magic = FILE_CACHE_STATE_MAGIC;
|
||||
fcs->chunk_size_log = lfc_chunk_size_log;
|
||||
fcs->n_chunks = n_entries;
|
||||
@@ -691,6 +695,11 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
|
||||
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
|
||||
}
|
||||
|
||||
if (FILE_CACHE_STATE_SIZE(fcs) != VARSIZE(fcs))
|
||||
{
|
||||
elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs));
|
||||
}
|
||||
|
||||
fcs_chunk_size_log = fcs->chunk_size_log;
|
||||
if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK)
|
||||
{
|
||||
@@ -733,66 +742,79 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
|
||||
lfc_store_prefetch_result = true;
|
||||
|
||||
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
|
||||
|
||||
while (true)
|
||||
PG_TRY();
|
||||
{
|
||||
if (snd_idx < max_prefetch_pages)
|
||||
while (true)
|
||||
{
|
||||
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
if (snd_idx < max_prefetch_pages)
|
||||
{
|
||||
/* If there are multiple workers, split chunks between them */
|
||||
snd_idx += 1 << fcs_chunk_size_log;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (BITMAP_ISSET(bitmap, snd_idx))
|
||||
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
{
|
||||
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
n_sent += 1;
|
||||
/* If there are multiple workers, split chunks between them */
|
||||
snd_idx += 1 << fcs_chunk_size_log;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (BITMAP_ISSET(bitmap, snd_idx))
|
||||
{
|
||||
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
|
||||
{
|
||||
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
n_sent += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
BITMAP_CLR(bitmap, snd_idx);
|
||||
}
|
||||
}
|
||||
snd_idx += 1;
|
||||
}
|
||||
snd_idx += 1;
|
||||
}
|
||||
}
|
||||
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages)
|
||||
{
|
||||
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages)
|
||||
{
|
||||
/* Skip chunks processed by other workers */
|
||||
rcv_idx += 1 << fcs_chunk_size_log;
|
||||
continue;
|
||||
}
|
||||
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
{
|
||||
/* Skip chunks processed by other workers */
|
||||
rcv_idx += 1 << fcs_chunk_size_log;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Locate next block to prefetch */
|
||||
while (!BITMAP_ISSET(bitmap, rcv_idx))
|
||||
{
|
||||
/* Locate next block to prefetch */
|
||||
while (!BITMAP_ISSET(bitmap, rcv_idx))
|
||||
{
|
||||
rcv_idx += 1;
|
||||
}
|
||||
/* Update progress indicator */
|
||||
ws->curr_chunk = rcv_idx >> fcs_chunk_size_log;
|
||||
|
||||
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
if (communicator_prefetch_receive(tag))
|
||||
{
|
||||
ws->prewarmed_pages += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ws->skipped_pages += 1;
|
||||
}
|
||||
rcv_idx += 1;
|
||||
}
|
||||
/* Update progress indicator */
|
||||
ws->curr_chunk = rcv_idx >> fcs_chunk_size_log;
|
||||
|
||||
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
if (communicator_prefetch_receive(tag))
|
||||
{
|
||||
ws->prewarmed_pages += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ws->skipped_pages += 1;
|
||||
}
|
||||
rcv_idx += 1;
|
||||
if (++n_received == n_sent && snd_idx == max_prefetch_pages)
|
||||
{
|
||||
break;
|
||||
if (++n_received == n_sent && snd_idx == max_prefetch_pages)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert(n_sent == n_received);
|
||||
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
|
||||
}
|
||||
Assert(n_sent == n_received);
|
||||
lfc_store_prefetch_result = save_lfc_store_prefetch_result;
|
||||
ws->curr_chunk = n_entries;
|
||||
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
|
||||
PG_FINALLY();
|
||||
{
|
||||
lfc_store_prefetch_result = save_lfc_store_prefetch_result;
|
||||
ws->curr_chunk = n_entries;
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
|
||||
|
||||
@@ -1991,17 +2013,9 @@ get_local_cache_state(PG_FUNCTION_ARGS)
|
||||
size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
|
||||
FileCacheState* fcs = lfc_get_state(max_entries);
|
||||
if (fcs != NULL)
|
||||
{
|
||||
size_t size_in_bytes = FILE_CACHE_STATE_SIZE(fcs->n_chunks);
|
||||
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
|
||||
|
||||
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
|
||||
memcpy(VARDATA(res), fcs, size_in_bytes);
|
||||
pfree(fcs);
|
||||
|
||||
PG_RETURN_BYTEA_P(res);
|
||||
}
|
||||
PG_RETURN_NULL();
|
||||
PG_RETURN_BYTEA_P((bytea*)fcs);
|
||||
else
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(prewarm_local_cache);
|
||||
@@ -2012,10 +2026,7 @@ prewarm_local_cache(PG_FUNCTION_ARGS)
|
||||
bytea* state = PG_GETARG_BYTEA_PP(0);
|
||||
uint32 worker_id = PG_GETARG_INT32(1);
|
||||
uint32 n_workers = PG_GETARG_INT32(2);
|
||||
FileCacheState* fcs = (FileCacheState*)VARDATA_ANY(state);
|
||||
|
||||
if (FILE_CACHE_STATE_SIZE(fcs->n_chunks) != VARSIZE_ANY_EXHDR(state))
|
||||
elog(ERROR, "LFC: Invalid file cache state size");
|
||||
FileCacheState* fcs = (FileCacheState*)state;
|
||||
|
||||
lfc_prewarm(fcs, worker_id, n_workers);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user