Rewrite unlogged relation build

This commit is contained in:
Konstantin Knizhnik
2024-04-24 14:47:26 +03:00
parent e4fc6c3162
commit 1bd86c5c6a
5 changed files with 209 additions and 24 deletions

View File

@@ -299,6 +299,13 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb
extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size);
extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);
extern bool start_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size);
extern bool is_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum);
extern bool stop_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum);
extern void resume_unlogged_build(void);
/* functions for local file cache */
#if PG_MAJORVERSION_NUM < 16
extern void lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -60,6 +60,7 @@
#include "storage/fsm_internals.h"
#include "storage/md.h"
#include "storage/smgr.h"
#include "utils/rel.h"
#include "pagestore_client.h"
@@ -1473,7 +1474,11 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
}
else if (forknum != FSM_FORKNUM && forknum != VISIBILITYMAP_FORKNUM)
{
mdcreate(reln, forknum, true);
if (start_unlogged_build(InfoFromSMgrRel(reln), forknum, blocknum+1))
{
mdcreate(reln, forknum, true);
resume_unlogged_build();
}
mdwrite(reln, forknum, blocknum, buffer, true);
ereport(SmgrTrace,
@@ -1488,22 +1493,39 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
}
else if (lsn < FirstNormalUnloggedLSN)
{
if (start_unlogged_build(InfoFromSMgrRel(reln),forknum, blocknum+1))
{
mdcreate(reln, forknum, true);
mdwrite(reln, forknum, blocknum, buffer, true);
resume_unlogged_build();
}
mdwrite(reln, forknum, blocknum, buffer, true);
ereport(SmgrTrace,
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is saved locally.",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum)));
ereport(SmgrTrace,
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is saved locally.",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum)));
}
else
{
ereport(SmgrTrace,
(errmsg(NEON_TAG "Evicting page %u of relation %u/%u/%u.%u with lsn=%X/%X",
if (is_unlogged_build(InfoFromSMgrRel(reln), forknum))
{
resume_unlogged_build();
mdwrite(reln, forknum, blocknum, buffer, true);
ereport(SmgrTrace,
(errmsg(NEON_TAG "Page %u with LSN=%X/%X of relation %u/%u/%u.%u is saved locally.",
blocknum,
LSN_FORMAT_ARGS(lsn),
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum)));
}
else
ereport(SmgrTrace,
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is already wal-logged at lsn=%X/%X",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, LSN_FORMAT_ARGS(lsn))));
forknum, LSN_FORMAT_ARGS(lsn)
)));
}
/*
@@ -1513,6 +1535,19 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
SetLastWrittenLSNForBlock(lsn, InfoFromSMgrRel(reln), forknum, blocknum);
}
static void
neon_log_newpage_range_callback(Relation rel, ForkNumber forknum)
{
SMgrRelation smgr = RelationGetSmgr(rel);
if (stop_unlogged_build(InfoFromSMgrRel(smgr), forknum))
{
mdclose(smgr, forknum);
/* use isRedo == true, so that we drop it immediately */
mdunlink(InfoBFromSMgrRel(smgr), forknum, true);
}
}
/*
* neon_init() -- Initialize private state
*/
@@ -1548,6 +1583,8 @@ neon_init(void)
old_redo_read_buffer_filter = redo_read_buffer_filter;
redo_read_buffer_filter = neon_redo_read_buffer_filter;
log_newpage_range_callback = neon_log_newpage_range_callback;
#ifdef DEBUG_COMPARE_LOCAL
mdinit();
#endif
@@ -2207,9 +2244,11 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
if (!XLogInsertAllowed())
return;
set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blocknum + nblocks);
#if 0
/* ensure we have enough xlog buffers to log max-sized records */
XLogEnsureRecordSpace(Min(remblocks, (XLR_MAX_BLOCK_ID - 1)), 0);
/*
* Iterate over all the pages. They are collected into batches of
* XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
@@ -2242,6 +2281,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
SetLastWrittenLSNForRelation(lsn, InfoFromSMgrRel(reln), forkNum);
set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blocknum);
#endif
}
#endif
@@ -2534,16 +2574,27 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
if (forkNum == MAIN_FORKNUM && PageIsNew((Page)buffer) && mdexists(reln, forkNum))
if (is_unlogged_build(InfoFromSMgrRel(reln), forkNum))
{
elog(SmgrTrace, "Read local page %d of relation %u/%u/%u.%u",
blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum);
if (blkno >= mdnblocks(reln, forkNum))
{
elog(SmgrTrace, "Get empty local page %d of relation %u/%u/%u.%u",
blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum);
memset(buffer, 0, BLCKSZ);
}
else
{
elog(SmgrTrace, "Read local page %d of relation %u/%u/%u.%u",
blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum);
mdread(reln, forkNum, blkno, buffer);
}
resume_unlogged_build();
}
else
{
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, blkno);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, latest, buffer);
}
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{

View File

@@ -39,7 +39,8 @@ typedef struct
typedef struct
{
RelTag tag;
BlockNumber size;
BlockNumber size : 31;
BlockNumber unlogged : 1;
dlist_node lru_node; /* LRU list node */
} RelSizeEntry;
@@ -156,6 +157,7 @@ set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
entry->size = size;
if (!found)
{
entry->unlogged = false;
if (++relsize_ctl->size == relsize_hash_size)
{
/*
@@ -191,10 +193,10 @@ update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
entry = hash_search(relsize_hash, &tag, HASH_ENTER, &found);
if (!found || entry->size < size)
if (!found) {
entry->unlogged = false;
entry->size = size;
if (!found)
{
if (++relsize_ctl->size == relsize_hash_size)
{
RelSizeEntry *victim = dlist_container(RelSizeEntry, lru_node, dlist_pop_head_node(&relsize_ctl->lru));
@@ -204,6 +206,9 @@ update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
}
else
{
if (entry->size < size)
entry->size = size;
dlist_delete(&entry->lru_node);
}
relsize_ctl->writes += 1;
@@ -232,6 +237,128 @@ forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum)
}
}
bool
start_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
{
bool start = false;
if (relsize_hash_size > 0)
{
RelTag tag;
RelSizeEntry *entry;
bool found;
tag.rinfo = rinfo;
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
entry = hash_search(relsize_hash, &tag, HASH_ENTER, &found);
if (!found) {
entry->size = size;
start = true;
if (++relsize_ctl->size == relsize_hash_size)
{
RelSizeEntry *victim = dlist_container(RelSizeEntry, lru_node, dlist_pop_head_node(&relsize_ctl->lru));
hash_search(relsize_hash, &victim->tag, HASH_REMOVE, NULL);
relsize_ctl->size -= 1;
}
}
else
{
start = !entry->unlogged;
if (entry->size < size)
entry->size = size;
dlist_delete(&entry->lru_node);
}
entry->unlogged = true;
relsize_ctl->writes += 1;
dlist_push_tail(&relsize_ctl->lru, &entry->lru_node);
if (!start)
LWLockRelease(relsize_lock);
else
elog(LOG, "Start unlogged build for %u/%u/%u.%u",
RelFileInfoFmt(rinfo), forknum);
}
return start;
}
bool
is_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum)
{
bool unlogged = false;
if (relsize_hash_size > 0)
{
RelTag tag;
RelSizeEntry *entry;
tag.rinfo = rinfo;
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_SHARED);
entry = hash_search(relsize_hash, &tag, HASH_FIND, NULL);
if (entry != NULL)
{
unlogged = entry->unlogged;
relsize_ctl->hits += 1;
/* Move entry to the LRU list tail */
dlist_delete(&entry->lru_node);
dlist_push_tail(&relsize_ctl->lru, &entry->lru_node);
}
else
{
relsize_ctl->misses += 1;
}
if (!unlogged)
LWLockRelease(relsize_lock);
}
return unlogged;
}
bool
stop_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum)
{
bool unlogged = false;
if (relsize_hash_size > 0)
{
RelTag tag;
RelSizeEntry *entry;
tag.rinfo = rinfo;
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
entry = hash_search(relsize_hash, &tag, HASH_FIND, NULL);
if (entry != NULL)
{
unlogged = entry->unlogged;
if (unlogged)
elog(LOG, "Stop unlogged build for %u/%u/%u.%u",
RelFileInfoFmt(rinfo), forknum);
entry->unlogged = false;
relsize_ctl->hits += 1;
/* Move entry to the LRU list tail */
dlist_delete(&entry->lru_node);
dlist_push_tail(&relsize_ctl->lru, &entry->lru_node);
/* use isRedo == true, so that we drop it immediately */
}
else
{
relsize_ctl->misses += 1;
}
LWLockRelease(relsize_lock);
}
return unlogged;
}
void
resume_unlogged_build(void)
{
LWLockRelease(relsize_lock);
}
void
relsize_hash_init(void)
{

View File

@@ -4316,7 +4316,7 @@ def check_restored_datadir_content(
restored_files = [f for f in restored_files if f not in ignored_files]
# check that file sets are equal
#assert pgdata_files == restored_files
# assert pgdata_files == restored_files
# compare content of the files
# filecmp returns (match, mismatch, error) lists
@@ -4339,7 +4339,7 @@ def check_restored_datadir_content(
cmd = f"diff {f1}.hex {f2}.hex"
subprocess.run([cmd], stdout=stdout_f, shell=True)
#assert (mismatch, error) == ([], [])
# assert (mismatch, error) == ([], [])
def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn:

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "ed0c41966dc85ab21c09f787a475bdc5cc2cbf47",
"postgres-v15": "6b1b8c2ba07966f7377a76c73993ca6efdaa699c",
"postgres-v14": "417dce7bb986134213598c299b14e40d0f916345"
"postgres-v16": "a673c9fd4af439512013713b9b537487927bbc63",
"postgres-v15": "411a463af69cd7d5480e158da3d8c23f46cb31cb",
"postgres-v14": "5ed0718551ccce80bd3644dd0330deeb6ffd919b"
}