mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Check prefetch termination on PS disconnect
This commit is contained in:
@@ -913,6 +913,8 @@ pub struct PagestreamNblocksResponse {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetPageResponse {
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
pub page: Bytes,
|
||||
}
|
||||
|
||||
@@ -1073,6 +1075,11 @@ impl PagestreamBeMessage {
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(Tag::GetPage as u8);
|
||||
bytes.put_u32(resp.rel.spcnode);
|
||||
bytes.put_u32(resp.rel.dbnode);
|
||||
bytes.put_u32(resp.rel.relnode);
|
||||
bytes.put_u8(resp.rel.forknum);
|
||||
bytes.put_u32(resp.blkno);
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
@@ -1114,9 +1121,20 @@ impl PagestreamBeMessage {
|
||||
Self::Nblocks(PagestreamNblocksResponse { n_blocks })
|
||||
}
|
||||
Tag::GetPage => {
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let blkno = buf.read_u32::<BigEndian>()?;
|
||||
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
|
||||
buf.read_exact(&mut page)?;
|
||||
PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
|
||||
PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
rel,
|
||||
blkno,
|
||||
page: page.into(),
|
||||
})
|
||||
}
|
||||
Tag::Error => {
|
||||
let mut msg = Vec::new();
|
||||
|
||||
@@ -1156,6 +1156,8 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
rel: req.rel,
|
||||
blkno: req.blkno,
|
||||
page,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -139,6 +139,9 @@ typedef struct
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
NRelFileInfo rinfo;
|
||||
ForkNumber forknum;
|
||||
BlockNumber blkno;
|
||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||
} NeonGetPageResponse;
|
||||
|
||||
|
||||
@@ -1116,6 +1116,11 @@ nm_unpack_response(StringInfo s)
|
||||
|
||||
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
|
||||
msg_resp->tag = tag;
|
||||
NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
msg_resp->forknum = pq_getmsgbyte(s);
|
||||
msg_resp->blkno = pq_getmsgint(s, 4);
|
||||
/* XXX: should be varlena */
|
||||
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
|
||||
pq_getmsgend(s);
|
||||
@@ -2208,10 +2213,20 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ);
|
||||
{
|
||||
NeonGetPageResponse* r = (NeonGetPageResponse *) resp;
|
||||
memcpy(buffer, r->page, BLCKSZ);
|
||||
if (memcmp(&r->rinfo, &rinfo, sizeof rinfo) != 0 && forkNum != r->forknum || blkno != r->blkno)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "[shard %d] get unexpected get page resonse for block %u in rel %u/%u/%u.%u instead of block block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
slot->shard_no,
|
||||
r->blkno, RelFileInfoFmt(r->rinfo), r->forknum,
|
||||
blkno, RelFileInfoFmt(rinfo), forkNum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn)));
|
||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||
break;
|
||||
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
|
||||
@@ -13,14 +13,15 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_pageserver_restarts")
|
||||
endpoint = env.endpoints.create_start("test_pageserver_restarts")
|
||||
n_restarts = 10
|
||||
endpoint = env.endpoints.create_start("test_pageserver_restarts", config_lines=["effective_io_concurrency=100"])
|
||||
|
||||
n_restarts = 100
|
||||
scale = 10
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
|
||||
pg_bin.run_capture(["pgbench", f"-c 10 -f select.sql -T{n_restarts}", connstr])
|
||||
|
||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||
thread.start()
|
||||
|
||||
Reference in New Issue
Block a user