diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 84b26198a7..74ffbdb371 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -59,6 +59,7 @@ #include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/buf_internals.h" +#include "storage/fsm_internals.h" #include "storage/smgr.h" #include "storage/md.h" #include "pgstat.h" @@ -2722,6 +2723,86 @@ smgr_init_neon(void) } +static void +neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, XLogRecPtr end_recptr) +{ + BlockNumber relsize; + /* Extend the relation if we know its size */ + if (get_cached_relsize(rinfo, forknum, &relsize)) + { + if (relsize < blkno + 1) + { + update_cached_relsize(rinfo, forknum, blkno + 1); + SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum); + } + } + else + { + /* + * Size was not cached. We populate the cache now, with the size of the + * relation measured after this WAL record is applied. + * + * This length is later reused when we open the smgr to read the block, + * which is fine and expected. + */ + + NeonResponse *response; + NeonNblocksResponse *nbresponse; + NeonNblocksRequest request = { + .req = (NeonRequest) { + .lsn = end_recptr, + .latest = false, + .tag = T_NeonNblocksRequest, + }, + .rinfo = rinfo, + .forknum = forknum, + }; + + response = page_server_request(&request); + + Assert(response->tag == T_NeonNblocksResponse); + nbresponse = (NeonNblocksResponse *) response; + + relsize = Max(nbresponse->n_blocks, blkno+1); + + set_cached_relsize(rinfo, forknum, relsize); + SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum); + + elog(SmgrTrace, "Set length to %d", relsize); + } +} + +#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4) + +/* + * TODO: May be it is better to make correspondent fgunctio from freespace.c public? + */ +static BlockNumber +get_fsm_physical_block(BlockNumber heapblk) +{ + BlockNumber pages; + int leafno; + int l; + + /* + * Calculate the logical page number of the first leaf page below the + * given page. + */ + leafno = heapblk / SlotsPerFSMPage; + + /* Count upper level nodes required to address the leaf page */ + pages = 0; + for (l = 0; l < FSM_TREE_DEPTH; l++) + { + pages += leafno + 1; + leafno /= SlotsPerFSMPage; + } + + /* Turn the page count into 0-based block number */ + return pages - 1; +} + + /* * Return whether we can skip the redo for this block. * @@ -2769,7 +2850,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) LWLock *partitionLock; Buffer buffer; bool no_redo_needed; - BlockNumber relsize; if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id)) return true; @@ -2819,49 +2899,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) LWLockRelease(partitionLock); - /* Extend the relation if we know its size */ - if (get_cached_relsize(rinfo, forknum, &relsize)) + neon_extend_rel_size(rinfo, forknum, blkno, end_recptr); + if (forknum == MAIN_FORKNUM) { - if (relsize < blkno + 1) - { - update_cached_relsize(rinfo, forknum, blkno + 1); - SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum); - } + neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr); } - else - { - /* - * Size was not cached. We populate the cache now, with the size of the - * relation measured after this WAL record is applied. - * - * This length is later reused when we open the smgr to read the block, - * which is fine and expected. - */ - - NeonResponse *response; - NeonNblocksResponse *nbresponse; - NeonNblocksRequest request = { - .req = (NeonRequest) { - .lsn = end_recptr, - .latest = false, - .tag = T_NeonNblocksRequest, - }, - .rinfo = rinfo, - .forknum = forknum, - }; - - response = page_server_request(&request); - - Assert(response->tag == T_NeonNblocksResponse); - nbresponse = (NeonNblocksResponse *) response; - - Assert(nbresponse->n_blocks > blkno); - - set_cached_relsize(rinfo, forknum, nbresponse->n_blocks); - SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum); - - elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks); - } - return no_redo_needed; } diff --git a/test_runner/regress/test_physical_replication.py b/test_runner/regress/test_physical_replication.py new file mode 100644 index 0000000000..034f2b669d --- /dev/null +++ b/test_runner/regress/test_physical_replication.py @@ -0,0 +1,29 @@ +import random +import time + +from fixtures.neon_fixtures import NeonEnv + + +def test_physical_replication(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 100000 + with env.endpoints.create_start( + branch_name="main", + endpoint_id="primary", + ) as primary: + with primary.connect() as p_con: + with p_con.cursor() as p_cur: + p_cur.execute( + "CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))" + ) + time.sleep(1) + with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: + with primary.connect() as p_con: + with p_con.cursor() as p_cur: + with secondary.connect() as s_con: + with s_con.cursor() as s_cur: + for pk in range(n_records): + p_cur.execute("insert into t (pk) values (%s)", (pk,)) + s_cur.execute( + "select * from t where pk=%s", (random.randrange(1, n_records),) + )