diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index aad4cc97fc..711bf20bfa 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -913,6 +913,8 @@ pub struct PagestreamNblocksResponse { #[derive(Debug)] pub struct PagestreamGetPageResponse { + pub rel: RelTag, + pub blkno: u32, pub page: Bytes, } @@ -1073,6 +1075,11 @@ impl PagestreamBeMessage { Self::GetPage(resp) => { bytes.put_u8(Tag::GetPage as u8); + bytes.put_u32(resp.rel.spcnode); + bytes.put_u32(resp.rel.dbnode); + bytes.put_u32(resp.rel.relnode); + bytes.put_u8(resp.rel.forknum); + bytes.put_u32(resp.blkno); bytes.put(&resp.page[..]); } @@ -1114,9 +1121,20 @@ impl PagestreamBeMessage { Self::Nblocks(PagestreamNblocksResponse { n_blocks }) } Tag::GetPage => { + let rel = RelTag { + spcnode: buf.read_u32::()?, + dbnode: buf.read_u32::()?, + relnode: buf.read_u32::()?, + forknum: buf.read_u8()?, + }; + let blkno = buf.read_u32::()?; let mut page = vec![0; 8192]; // TODO: use MaybeUninit buf.read_exact(&mut page)?; - PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() }) + PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + rel, + blkno, + page: page.into(), + }) } Tag::Error => { let mut msg = Vec::new(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f3ceb7d3e6..b0e71250fd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1156,6 +1156,8 @@ impl PageServerHandler { .await?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + rel: req.rel, + blkno: req.blkno, page, })) } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 44ae766f76..ead18d5d56 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -139,6 +139,9 @@ typedef struct typedef struct { NeonMessageTag tag; + NRelFileInfo rinfo; + ForkNumber forknum; + BlockNumber blkno; char page[FLEXIBLE_ARRAY_MEMBER]; } NeonGetPageResponse; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index ecc8ddb384..b9a039a370 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1116,6 +1116,11 @@ nm_unpack_response(StringInfo s) msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE); msg_resp->tag = tag; + NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); + msg_resp->forknum = pq_getmsgbyte(s); + msg_resp->blkno = pq_getmsgint(s, 4); /* XXX: should be varlena */ memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ); pq_getmsgend(s); @@ -2208,10 +2213,20 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, switch (resp->tag) { case T_NeonGetPageResponse: - memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ); + { + NeonGetPageResponse* r = (NeonGetPageResponse *) resp; + memcpy(buffer, r->page, BLCKSZ); + if (memcmp(&r->rinfo, &rinfo, sizeof rinfo) != 0 && forkNum != r->forknum || blkno != r->blkno) + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "[shard %d] get unexpected get page resonse for block %u in rel %u/%u/%u.%u instead of block block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", + slot->shard_no, + r->blkno, RelFileInfoFmt(r->rinfo), r->forknum, + blkno, RelFileInfoFmt(rinfo), forkNum, + (uint32) (request_lsn >> 32), (uint32) request_lsn))); lfc_write(rinfo, forkNum, blkno, buffer); break; - + } case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), diff --git a/test_runner/regress/test_pageserver_restarts_under_workload.py b/test_runner/regress/test_pageserver_restarts_under_workload.py index 65569f3bac..4225fcbc02 100644 --- a/test_runner/regress/test_pageserver_restarts_under_workload.py +++ b/test_runner/regress/test_pageserver_restarts_under_workload.py @@ -13,14 +13,15 @@ from fixtures.neon_fixtures import NeonEnv, PgBin def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin): env = neon_simple_env env.neon_cli.create_branch("test_pageserver_restarts") - endpoint = env.endpoints.create_start("test_pageserver_restarts") - n_restarts = 10 + endpoint = env.endpoints.create_start("test_pageserver_restarts", config_lines=["effective_io_concurrency=100"]) + + n_restarts = 100 scale = 10 def run_pgbench(connstr: str): log.info(f"Start a pgbench workload on pg {connstr}") pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) - pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr]) + pg_bin.run_capture(["pgbench", f"-c 10 -f select.sql -T{n_restarts}", connstr]) thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True) thread.start()