Compare commits

...

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
9f1e2aa0da Implement replica prewarm 2024-10-21 17:10:48 +03:00
12 changed files with 266 additions and 45 deletions

View File

@@ -184,6 +184,7 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30;
pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40;
pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50;
pub const XLOG_NEON_LFC_PREWARM: u8 = 0x60;
pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40;

View File

@@ -1159,6 +1159,7 @@ impl WalIngest {
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
pg_constants::XLOG_NEON_LFC_PREWARM => {}
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
}
}

View File

@@ -22,13 +22,17 @@
#include "neon_pgversioncompat.h"
#include "access/parallel.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include "file_cache.h"
#include "common/hashfn.h"
#include "pgstat.h"
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include RELFILEINFO_HDR
#include "storage/buf_internals.h"
#include "storage/fd.h"
@@ -39,12 +43,18 @@
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "../neon_rmgr/neon_rmgr.h"
#include "hll.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"
#if PG_VERSION_NUM>=160000
#include "access/neon_xlog.h"
#endif
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
@@ -78,29 +88,17 @@
* before extending the nominal size of the file.
*/
/* Local file storage allocation chunk.
* Should be power of two. Using larger than page chunks can
* 1. Reduce hash-map memory footprint: 8TB database contains billion pages
* and size of hash entry is 40 bytes, so we need 40Gb just for hash map.
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
/*
* Smaller chunk seems to be better for OLTP workload
*/
// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32)
typedef struct FileCacheEntry
{
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 access_count : 31;
uint32 synced : 1;
uint32 bitmap[CHUNK_BITMAP_SIZE];
dlist_node list_node; /* LRU/holes list node */
} FileCacheEntry;
@@ -124,11 +122,15 @@ typedef struct FileCacheControl
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
#define LFC_MAX_PREWARM_SIZE 1024
#define LFC_PREWARM_POLL_INTERVAL 1000000 /* 1 second */
static HTAB *lfc_hash;
static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_rate;
static char *lfc_path;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
@@ -374,6 +376,7 @@ lfc_change_limit_hook(int newval, void *extra)
hole->hash = hash;
hole->offset = offset;
hole->access_count = 0;
hole->synced = 0;
CriticalAssert(!found);
dlist_push_tail(&lfc_ctl->holes, &hole->list_node);
@@ -388,6 +391,26 @@ lfc_change_limit_hook(int newval, void *extra)
LWLockRelease(lfc_lock);
}
static void
lfc_register_prewarm_worker()
{
#if PG_MAJORVERSION_NUM >= 16
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCachePrewarmMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
#endif
}
void
lfc_init(void)
{
@@ -436,6 +459,19 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_rate",
"Interval of generating prewarm WAL records",
NULL,
&lfc_prewarm_rate,
0, /* disabled by default */
0,
INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -447,6 +483,8 @@ lfc_init(void)
#else
lfc_shmem_request();
#endif
lfc_register_prewarm_worker();
}
/*
@@ -693,7 +731,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/*
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
@@ -961,6 +999,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
entry->synced = false;
generation = lfc_ctl->generation;
entry_offset = entry->offset;
@@ -1013,6 +1052,57 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
}
#if PG_MAJORVERSION_NUM >= 16
PGDLLEXPORT void
FileCachePrewarmMain(Datum main_arg)
{
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
BackgroundWorkerUnblockSignals();
while (!ShutdownRequestPending)
{
FileCacheEntryDesc prewarm[LFC_MAX_PREWARM_SIZE];
size_t n_prewarm = 0;
dlist_iter iter;
pg_usleep(lfc_prewarm_rate ? lfc_prewarm_rate*1000 : LFC_PREWARM_POLL_INTERVAL);
CHECK_FOR_INTERRUPTS();
if (lfc_prewarm_rate == 0)
continue;
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* First send most recently used entryies */
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry* entry = dlist_container(FileCacheEntry, list_node, iter.cur);
if (!entry->synced)
{
prewarm[n_prewarm].key = entry->key;
memcpy(prewarm[n_prewarm].bitmap, entry->bitmap, sizeof(entry->bitmap));
entry->synced = true;
if (++n_prewarm == LFC_MAX_PREWARM_SIZE)
break;
}
}
LWLockRelease(lfc_lock);
if (n_prewarm > 0)
{
XLogBeginInsert();
XLogRegisterData((char *) &prewarm, n_prewarm*sizeof(FileCacheEntryDesc));
XLogFlush(XLogInsert(RM_NEON_ID, XLOG_NEON_LFC_PREWARM));
}
}
}
#endif
/*
* Admin functions
*/
typedef struct
{
TupleDesc tupdesc;

64
pgxn/neon/file_cache.h Normal file
View File

@@ -0,0 +1,64 @@
/*-------------------------------------------------------------------------
*
* file_cache.h
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef file_cache_h
#define file_cache_h
#include "neon_pgversioncompat.h"
/* Local file storage allocation chunk.
* Should be power of two. Using larger than page chunks can
* 1. Reduce hash-map memory footprint: 8TB database contains billion pages
* and size of hash entry is 40 bytes, so we need 40Gb just for hash map.
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32)
typedef struct
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheEntryDesc;
PGDLLEXPORT void FileCachePrewarmMain(Datum main_arg);
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif

View File

@@ -33,6 +33,7 @@
#include "neon_perf_counters.h"
#include "neon_utils.h"
#include "pagestore_client.h"
#include "file_cache.h"
#include "walproposer.h"
#define PageStoreTrace DEBUG5

View File

@@ -261,35 +261,5 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb
extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size);
extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);
/* functions for local file cache */
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif

View File

@@ -68,6 +68,7 @@
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#include "file_cache.h"
#include "bitmap.h"
#if PG_VERSION_NUM >= 150000

View File

@@ -13,9 +13,12 @@
#include "miscadmin.h"
#include "storage/buf.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
#include "storage/bufpage.h"
#include "storage/freespace.h"
#include "neon_rmgr.h"
#include "../neon/file_cache.h"
#include "../neon/neon_pgversioncompat.h"
PG_MODULE_MAGIC;
void _PG_init(void);
@@ -30,6 +33,7 @@ static void redo_neon_heap_delete(XLogReaderState *record);
static void redo_neon_heap_update(XLogReaderState *record, bool hot_update);
static void redo_neon_heap_lock(XLogReaderState *record);
static void redo_neon_heap_multi_insert(XLogReaderState *record);
static void redo_neon_lfc_prewarm(XLogReaderState *record);
const static RmgrData NeonRmgr = {
.rm_name = "neon",
@@ -76,6 +80,9 @@ neon_rm_redo(XLogReaderState *record)
case XLOG_NEON_HEAP_MULTI_INSERT:
redo_neon_heap_multi_insert(record);
break;
case XLOG_NEON_LFC_PREWARM:
redo_neon_lfc_prewarm(record);
break;
default:
elog(PANIC, "neon_rm_redo: unknown op code %u", info);
}
@@ -882,6 +889,28 @@ redo_neon_heap_multi_insert(XLogReaderState *record)
XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
}
static void
redo_neon_lfc_prewarm(XLogReaderState *record)
{
FileCacheEntryDesc* entries = (FileCacheEntryDesc*)XLogRecGetData(record);
size_t n_entries = XLogRecGetDataLen(record)/sizeof(FileCacheEntryDesc);
char buf[BLCKSZ];
for (size_t i = 0; i < n_entries; i++)
{
FileCacheEntryDesc* entry = &entries[i];
NRelFileInfo rinfo = BufTagGetNRelFileInfo(entry->key);
SMgrRelation reln = smgropen(rinfo, INVALID_PROC_NUMBER, RELPERSISTENCE_PERMANENT);
for (size_t j = 0; j < CHUNK_BITMAP_SIZE; j++)
{
if (entry->bitmap[j >> 5] & (1 << (i & 31)))
{
smgrread(reln, entry->key.forkNum, entry->key.blockNum + j, buf);
}
}
}
}
#else
/* safeguard for older PostgreSQL versions */
PG_MODULE_MAGIC;

View File

@@ -5,6 +5,8 @@
#include "replication/decode.h"
#include "replication/logical.h"
#define XLOG_NEON_LFC_PREWARM 0x60
extern void neon_rm_desc(StringInfo buf, XLogReaderState *record);
extern void neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern const char *neon_rm_identify(uint8 info);

View File

@@ -456,6 +456,8 @@ neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonMultiInsert(ctx, buf);
break;
case XLOG_NEON_LFC_PREWARM:
break;
default:
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
break;

View File

@@ -113,6 +113,10 @@ neon_rm_desc(StringInfo buf, XLogReaderState *record)
xlrec->ntuples, &offset_elem_desc, NULL);
}
}
else if (info == XLOG_NEON_LFC_PREWARM)
{
appendStringInfo(buf, "%d chunks", XLogRecGetDataLen(record));
}
}
const char *
@@ -152,6 +156,9 @@ neon_rm_identify(uint8 info)
case XLOG_NEON_HEAP_MULTI_INSERT | XLOG_NEON_INIT_PAGE:
id = "MULTI_INSERT+INIT";
break;
case XLOG_NEON_LFC_PREWARM:
id = "LFC_PREWARM";
break;
}
return id;

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import pytest
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.pg_version import PgVersion
def test_replica_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
if env.pg_version < PgVersion.V16:
pytest.skip("NEON_RM is available only in PG16")
primary = env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_rate=1s",
],
)
p_con = primary.connect()
p_cur = p_con.cursor()
p_cur.execute("create extension neon")
p_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
secondary = env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
p_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
wait_replica_caughtup(primary, secondary)
s_con = secondary.connect()
s_cur = s_con.cursor()
s_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = s_cur.fetchall()[0][0]
assert lfc_used_pages > 10000
s_cur.execute("select sum(pk) from t")
assert s_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2