Track size of FSM fork while applying records at replica (#5901)

## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1700560921471619

## Summary of changes

Update relation size cache for FSM fork in WAL records filter

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2023-12-05 18:49:24 +02:00
committed by GitHub
parent 483caa22c6
commit 7fab731f65
2 changed files with 113 additions and 43 deletions

View File

@@ -59,6 +59,7 @@
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
#include "storage/fsm_internals.h"
#include "storage/smgr.h"
#include "storage/md.h"
#include "pgstat.h"
@@ -2722,6 +2723,86 @@ smgr_init_neon(void)
}
static void
neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, XLogRecPtr end_recptr)
{
BlockNumber relsize;
/* Extend the relation if we know its size */
if (get_cached_relsize(rinfo, forknum, &relsize))
{
if (relsize < blkno + 1)
{
update_cached_relsize(rinfo, forknum, blkno + 1);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
}
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rinfo = rinfo,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
relsize = Max(nbresponse->n_blocks, blkno+1);
set_cached_relsize(rinfo, forknum, relsize);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
elog(SmgrTrace, "Set length to %d", relsize);
}
}
#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4)
/*
* TODO: May be it is better to make correspondent fgunctio from freespace.c public?
*/
static BlockNumber
get_fsm_physical_block(BlockNumber heapblk)
{
BlockNumber pages;
int leafno;
int l;
/*
* Calculate the logical page number of the first leaf page below the
* given page.
*/
leafno = heapblk / SlotsPerFSMPage;
/* Count upper level nodes required to address the leaf page */
pages = 0;
for (l = 0; l < FSM_TREE_DEPTH; l++)
{
pages += leafno + 1;
leafno /= SlotsPerFSMPage;
}
/* Turn the page count into 0-based block number */
return pages - 1;
}
/*
* Return whether we can skip the redo for this block.
*
@@ -2769,7 +2850,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
LWLock *partitionLock;
Buffer buffer;
bool no_redo_needed;
BlockNumber relsize;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
@@ -2819,49 +2899,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
LWLockRelease(partitionLock);
/* Extend the relation if we know its size */
if (get_cached_relsize(rinfo, forknum, &relsize))
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
if (relsize < blkno + 1)
{
update_cached_relsize(rinfo, forknum, blkno + 1);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
}
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rinfo = rinfo,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rinfo, forknum, nbresponse->n_blocks);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}
return no_redo_needed;
}

View File

@@ -0,0 +1,29 @@
import random
import time
from fixtures.neon_fixtures import NeonEnv
def test_physical_replication(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 100000
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute(
"CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))"
)
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
for pk in range(n_records):
p_cur.execute("insert into t (pk) values (%s)", (pk,))
s_cur.execute(
"select * from t where pk=%s", (random.randrange(1, n_records),)
)