From 1bd86c5c6a364dddcb3ecae2a0bb5cbc973dafcb Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 24 Apr 2024 14:47:26 +0300 Subject: [PATCH] Rewrite unlogged relation build --- pgxn/neon/pagestore_client.h | 7 ++ pgxn/neon/pagestore_smgr.c | 81 +++++++++++++--- pgxn/neon/relsize_cache.c | 135 +++++++++++++++++++++++++- test_runner/fixtures/neon_fixtures.py | 4 +- vendor/revisions.json | 6 +- 5 files changed, 209 insertions(+), 24 deletions(-) diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 8951e6607b..6c726c22d9 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -299,6 +299,13 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size); extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum); +extern bool start_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size); +extern bool is_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum); +extern bool stop_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum); +extern void resume_unlogged_build(void); + + + /* functions for local file cache */ #if PG_MAJORVERSION_NUM < 16 extern void lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 45b781c95f..d2dd350c76 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -60,6 +60,7 @@ #include "storage/fsm_internals.h" #include "storage/md.h" #include "storage/smgr.h" +#include "utils/rel.h" #include "pagestore_client.h" @@ -1473,7 +1474,11 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co } else if (forknum != FSM_FORKNUM && forknum != VISIBILITYMAP_FORKNUM) { - mdcreate(reln, forknum, true); + if (start_unlogged_build(InfoFromSMgrRel(reln), forknum, blocknum+1)) + { + mdcreate(reln, forknum, true); + resume_unlogged_build(); + } mdwrite(reln, forknum, blocknum, buffer, true); ereport(SmgrTrace, @@ -1488,22 +1493,39 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co } else if (lsn < FirstNormalUnloggedLSN) { + if (start_unlogged_build(InfoFromSMgrRel(reln),forknum, blocknum+1)) + { mdcreate(reln, forknum, true); - mdwrite(reln, forknum, blocknum, buffer, true); + resume_unlogged_build(); + } + mdwrite(reln, forknum, blocknum, buffer, true); - ereport(SmgrTrace, - (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is saved locally.", - blocknum, - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum))); + ereport(SmgrTrace, + (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is saved locally.", + blocknum, + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum))); } else { - ereport(SmgrTrace, - (errmsg(NEON_TAG "Evicting page %u of relation %u/%u/%u.%u with lsn=%X/%X", + if (is_unlogged_build(InfoFromSMgrRel(reln), forknum)) + { + resume_unlogged_build(); + mdwrite(reln, forknum, blocknum, buffer, true); + ereport(SmgrTrace, + (errmsg(NEON_TAG "Page %u with LSN=%X/%X of relation %u/%u/%u.%u is saved locally.", + blocknum, + LSN_FORMAT_ARGS(lsn), + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum))); + } + else + ereport(SmgrTrace, + (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is already wal-logged at lsn=%X/%X", blocknum, RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, LSN_FORMAT_ARGS(lsn)))); + forknum, LSN_FORMAT_ARGS(lsn) + ))); } /* @@ -1513,6 +1535,19 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co SetLastWrittenLSNForBlock(lsn, InfoFromSMgrRel(reln), forknum, blocknum); } +static void +neon_log_newpage_range_callback(Relation rel, ForkNumber forknum) +{ + SMgrRelation smgr = RelationGetSmgr(rel); + if (stop_unlogged_build(InfoFromSMgrRel(smgr), forknum)) + { + mdclose(smgr, forknum); + /* use isRedo == true, so that we drop it immediately */ + mdunlink(InfoBFromSMgrRel(smgr), forknum, true); + } +} + + /* * neon_init() -- Initialize private state */ @@ -1548,6 +1583,8 @@ neon_init(void) old_redo_read_buffer_filter = redo_read_buffer_filter; redo_read_buffer_filter = neon_redo_read_buffer_filter; + log_newpage_range_callback = neon_log_newpage_range_callback; + #ifdef DEBUG_COMPARE_LOCAL mdinit(); #endif @@ -2207,9 +2244,11 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum, if (!XLogInsertAllowed()) return; + set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blocknum + nblocks); + +#if 0 /* ensure we have enough xlog buffers to log max-sized records */ XLogEnsureRecordSpace(Min(remblocks, (XLR_MAX_BLOCK_ID - 1)), 0); - /* * Iterate over all the pages. They are collected into batches of * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each @@ -2242,6 +2281,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum, SetLastWrittenLSNForRelation(lsn, InfoFromSMgrRel(reln), forkNum); set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blocknum); +#endif } #endif @@ -2534,16 +2574,27 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno); neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer); - if (forkNum == MAIN_FORKNUM && PageIsNew((Page)buffer) && mdexists(reln, forkNum)) + if (is_unlogged_build(InfoFromSMgrRel(reln), forkNum)) { - elog(SmgrTrace, "Read local page %d of relation %u/%u/%u.%u", - blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum); if (blkno >= mdnblocks(reln, forkNum)) + { + elog(SmgrTrace, "Get empty local page %d of relation %u/%u/%u.%u", + blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum); memset(buffer, 0, BLCKSZ); + } else + { + elog(SmgrTrace, "Read local page %d of relation %u/%u/%u.%u", + blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum); mdread(reln, forkNum, blkno, buffer); + } + resume_unlogged_build(); + } + else + { + request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, blkno); + neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, latest, buffer); } - #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) { diff --git a/pgxn/neon/relsize_cache.c b/pgxn/neon/relsize_cache.c index cc7ac2c394..178c521ed3 100644 --- a/pgxn/neon/relsize_cache.c +++ b/pgxn/neon/relsize_cache.c @@ -39,7 +39,8 @@ typedef struct typedef struct { RelTag tag; - BlockNumber size; + BlockNumber size : 31; + BlockNumber unlogged : 1; dlist_node lru_node; /* LRU list node */ } RelSizeEntry; @@ -156,6 +157,7 @@ set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) entry->size = size; if (!found) { + entry->unlogged = false; if (++relsize_ctl->size == relsize_hash_size) { /* @@ -191,10 +193,10 @@ update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) tag.forknum = forknum; LWLockAcquire(relsize_lock, LW_EXCLUSIVE); entry = hash_search(relsize_hash, &tag, HASH_ENTER, &found); - if (!found || entry->size < size) + if (!found) { + entry->unlogged = false; entry->size = size; - if (!found) - { + if (++relsize_ctl->size == relsize_hash_size) { RelSizeEntry *victim = dlist_container(RelSizeEntry, lru_node, dlist_pop_head_node(&relsize_ctl->lru)); @@ -204,6 +206,9 @@ update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) } else { + if (entry->size < size) + entry->size = size; + dlist_delete(&entry->lru_node); } relsize_ctl->writes += 1; @@ -232,6 +237,128 @@ forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum) } } +bool +start_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) +{ + bool start = false; + if (relsize_hash_size > 0) + { + RelTag tag; + RelSizeEntry *entry; + bool found; + + tag.rinfo = rinfo; + tag.forknum = forknum; + LWLockAcquire(relsize_lock, LW_EXCLUSIVE); + entry = hash_search(relsize_hash, &tag, HASH_ENTER, &found); + if (!found) { + entry->size = size; + start = true; + + if (++relsize_ctl->size == relsize_hash_size) + { + RelSizeEntry *victim = dlist_container(RelSizeEntry, lru_node, dlist_pop_head_node(&relsize_ctl->lru)); + hash_search(relsize_hash, &victim->tag, HASH_REMOVE, NULL); + relsize_ctl->size -= 1; + } + } + else + { + start = !entry->unlogged; + + if (entry->size < size) + entry->size = size; + + dlist_delete(&entry->lru_node); + } + entry->unlogged = true; + relsize_ctl->writes += 1; + dlist_push_tail(&relsize_ctl->lru, &entry->lru_node); + if (!start) + LWLockRelease(relsize_lock); + else + elog(LOG, "Start unlogged build for %u/%u/%u.%u", + RelFileInfoFmt(rinfo), forknum); + + } + return start; +} + +bool +is_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum) +{ + bool unlogged = false; + + if (relsize_hash_size > 0) + { + RelTag tag; + RelSizeEntry *entry; + + tag.rinfo = rinfo; + tag.forknum = forknum; + LWLockAcquire(relsize_lock, LW_SHARED); + entry = hash_search(relsize_hash, &tag, HASH_FIND, NULL); + if (entry != NULL) + { + unlogged = entry->unlogged; + relsize_ctl->hits += 1; + /* Move entry to the LRU list tail */ + dlist_delete(&entry->lru_node); + dlist_push_tail(&relsize_ctl->lru, &entry->lru_node); + } + else + { + relsize_ctl->misses += 1; + } + if (!unlogged) + LWLockRelease(relsize_lock); + } + return unlogged; +} + +bool +stop_unlogged_build(NRelFileInfo rinfo, ForkNumber forknum) +{ + bool unlogged = false; + + if (relsize_hash_size > 0) + { + RelTag tag; + RelSizeEntry *entry; + + tag.rinfo = rinfo; + tag.forknum = forknum; + LWLockAcquire(relsize_lock, LW_EXCLUSIVE); + entry = hash_search(relsize_hash, &tag, HASH_FIND, NULL); + if (entry != NULL) + { + unlogged = entry->unlogged; + if (unlogged) + elog(LOG, "Stop unlogged build for %u/%u/%u.%u", + RelFileInfoFmt(rinfo), forknum); + entry->unlogged = false; + relsize_ctl->hits += 1; + /* Move entry to the LRU list tail */ + dlist_delete(&entry->lru_node); + dlist_push_tail(&relsize_ctl->lru, &entry->lru_node); + /* use isRedo == true, so that we drop it immediately */ + } + else + { + relsize_ctl->misses += 1; + } + LWLockRelease(relsize_lock); + } + return unlogged; +} + +void +resume_unlogged_build(void) +{ + LWLockRelease(relsize_lock); +} + + void relsize_hash_init(void) { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 7f89297eb0..33f5aa4225 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4316,7 +4316,7 @@ def check_restored_datadir_content( restored_files = [f for f in restored_files if f not in ignored_files] # check that file sets are equal - #assert pgdata_files == restored_files + # assert pgdata_files == restored_files # compare content of the files # filecmp returns (match, mismatch, error) lists @@ -4339,7 +4339,7 @@ def check_restored_datadir_content( cmd = f"diff {f1}.hex {f2}.hex" subprocess.run([cmd], stdout=stdout_f, shell=True) - #assert (mismatch, error) == ([], []) + # assert (mismatch, error) == ([], []) def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn: diff --git a/vendor/revisions.json b/vendor/revisions.json index b39ad07101..93a42f27fd 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "ed0c41966dc85ab21c09f787a475bdc5cc2cbf47", - "postgres-v15": "6b1b8c2ba07966f7377a76c73993ca6efdaa699c", - "postgres-v14": "417dce7bb986134213598c299b14e40d0f916345" + "postgres-v16": "a673c9fd4af439512013713b9b537487927bbc63", + "postgres-v15": "411a463af69cd7d5480e158da3d8c23f46cb31cb", + "postgres-v14": "5ed0718551ccce80bd3644dd0330deeb6ffd919b" }