mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 21:02:56 +00:00
Compare commits
7 Commits
diko/baseb
...
test_recon
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e21b2eb7dc | ||
|
|
1cd71d29ac | ||
|
|
86762ba313 | ||
|
|
6a42b6b87f | ||
|
|
48013845f0 | ||
|
|
40226ce7da | ||
|
|
ea2083653c |
@@ -913,6 +913,8 @@ pub struct PagestreamNblocksResponse {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetPageResponse {
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
pub page: Bytes,
|
||||
}
|
||||
|
||||
@@ -1073,6 +1075,11 @@ impl PagestreamBeMessage {
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(Tag::GetPage as u8);
|
||||
bytes.put_u32(resp.rel.spcnode);
|
||||
bytes.put_u32(resp.rel.dbnode);
|
||||
bytes.put_u32(resp.rel.relnode);
|
||||
bytes.put_u8(resp.rel.forknum);
|
||||
bytes.put_u32(resp.blkno);
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
@@ -1114,9 +1121,20 @@ impl PagestreamBeMessage {
|
||||
Self::Nblocks(PagestreamNblocksResponse { n_blocks })
|
||||
}
|
||||
Tag::GetPage => {
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let blkno = buf.read_u32::<BigEndian>()?;
|
||||
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
|
||||
buf.read_exact(&mut page)?;
|
||||
PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
|
||||
PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
rel,
|
||||
blkno,
|
||||
page: page.into(),
|
||||
})
|
||||
}
|
||||
Tag::Error => {
|
||||
let mut msg = Vec::new();
|
||||
|
||||
@@ -1156,6 +1156,8 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
rel: req.rel,
|
||||
blkno: req.blkno,
|
||||
page,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -111,6 +111,7 @@ static PageServer page_servers[MAX_SHARDS];
|
||||
|
||||
static bool pageserver_flush(shardno_t shard_no);
|
||||
static void pageserver_disconnect(shardno_t shard_no);
|
||||
static void pageserver_disconnect_shard(shardno_t shard_no);
|
||||
|
||||
static bool
|
||||
PagestoreShmemIsValid(void)
|
||||
@@ -487,9 +488,31 @@ retry:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Reset prefetch and drop connection to the shard.
|
||||
* It also drops connection to all other shards involved in prefetch.
|
||||
*/
|
||||
static void
|
||||
pageserver_disconnect(shardno_t shard_no)
|
||||
{
|
||||
if (page_servers[shard_no].conn)
|
||||
{
|
||||
/*
|
||||
* If the connection to any pageserver is lost, we throw away the
|
||||
* whole prefetch queue, even for other pageservers. It should not
|
||||
* cause big problems, because connection loss is supposed to be a
|
||||
* rare event.
|
||||
*/
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
pageserver_disconnect_shard(shard_no);
|
||||
}
|
||||
|
||||
/*
|
||||
* Disconnect from specified shard
|
||||
*/
|
||||
static void
|
||||
pageserver_disconnect_shard(shardno_t shard_no)
|
||||
{
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not clear
|
||||
@@ -503,14 +526,6 @@ pageserver_disconnect(shardno_t shard_no)
|
||||
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
|
||||
PQfinish(page_servers[shard_no].conn);
|
||||
page_servers[shard_no].conn = NULL;
|
||||
|
||||
/*
|
||||
* If the connection to any pageserver is lost, we throw away the
|
||||
* whole prefetch queue, even for other pageservers. It should not
|
||||
* cause big problems, because connection loss is supposed to be a
|
||||
* rare event.
|
||||
*/
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
if (page_servers[shard_no].wes != NULL)
|
||||
{
|
||||
@@ -676,7 +691,8 @@ page_server_api api =
|
||||
{
|
||||
.send = pageserver_send,
|
||||
.flush = pageserver_flush,
|
||||
.receive = pageserver_receive
|
||||
.receive = pageserver_receive,
|
||||
.disconnect = pageserver_disconnect_shard
|
||||
};
|
||||
|
||||
static bool
|
||||
|
||||
@@ -139,6 +139,9 @@ typedef struct
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
NRelFileInfo rinfo;
|
||||
ForkNumber forknum;
|
||||
BlockNumber blkno;
|
||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||
} NeonGetPageResponse;
|
||||
|
||||
@@ -180,6 +183,7 @@ typedef struct
|
||||
bool (*send) (shardno_t shard_no, NeonRequest * request);
|
||||
NeonResponse *(*receive) (shardno_t shard_no);
|
||||
bool (*flush) (shardno_t shard_no);
|
||||
void (*disconnect) (shardno_t shard_no);
|
||||
} page_server_api;
|
||||
|
||||
extern void prefetch_on_ps_disconnect(void);
|
||||
|
||||
@@ -613,6 +613,14 @@ prefetch_on_ps_disconnect(void)
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->my_ring_index == ring_index);
|
||||
|
||||
/*
|
||||
* Drop connection to all shards which have prefetch requests.
|
||||
* It is not a problem to call disconnect multiple times on the same connection
|
||||
* because disconnect implementation in libpagestore.c will check if connection
|
||||
* is alive and do nothing of connection was already dropped.
|
||||
*/
|
||||
page_server->disconnect(slot->shard_no);
|
||||
|
||||
/* clean up the request */
|
||||
slot->status = PRFS_TAG_REMAINS;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
@@ -633,13 +641,12 @@ prefetch_on_ps_disconnect(void)
|
||||
static inline void
|
||||
prefetch_set_unused(uint64 ring_index)
|
||||
{
|
||||
PrefetchRequest *slot = GetPrfSlot(ring_index);
|
||||
PrefetchRequest *slot;
|
||||
|
||||
if (ring_index < MyPState->ring_last)
|
||||
return; /* Should already be unused */
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
|
||||
slot = GetPrfSlot(ring_index);
|
||||
if (slot->status == PRFS_UNUSED)
|
||||
return;
|
||||
|
||||
@@ -798,7 +805,8 @@ Retry:
|
||||
{
|
||||
if (*force_lsn > slot->effective_request_lsn)
|
||||
{
|
||||
prefetch_wait_for(ring_index);
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
@@ -813,7 +821,8 @@ Retry:
|
||||
{
|
||||
if (*force_lsn != slot->effective_request_lsn)
|
||||
{
|
||||
prefetch_wait_for(ring_index);
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
@@ -879,7 +888,8 @@ Retry:
|
||||
{
|
||||
case PRFS_REQUESTED:
|
||||
Assert(MyPState->ring_receive == cleanup_index);
|
||||
prefetch_wait_for(cleanup_index);
|
||||
if (!prefetch_wait_for(cleanup_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(cleanup_index);
|
||||
break;
|
||||
case PRFS_RECEIVED:
|
||||
@@ -1108,6 +1118,11 @@ nm_unpack_response(StringInfo s)
|
||||
|
||||
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
|
||||
msg_resp->tag = tag;
|
||||
NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
msg_resp->forknum = pq_getmsgbyte(s);
|
||||
msg_resp->blkno = pq_getmsgint(s, 4);
|
||||
/* XXX: should be varlena */
|
||||
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
|
||||
pq_getmsgend(s);
|
||||
@@ -2132,6 +2147,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
Retry:
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -2153,7 +2169,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
*/
|
||||
if (slot->status == PRFS_REQUESTED)
|
||||
{
|
||||
prefetch_wait_for(slot->my_ring_index);
|
||||
if (!prefetch_wait_for(slot->my_ring_index))
|
||||
goto Retry;
|
||||
}
|
||||
/* drop caches */
|
||||
prefetch_set_unused(slot->my_ring_index);
|
||||
@@ -2200,10 +2217,20 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ);
|
||||
{
|
||||
NeonGetPageResponse* r = (NeonGetPageResponse *) resp;
|
||||
memcpy(buffer, r->page, BLCKSZ);
|
||||
if (memcmp(&r->rinfo, &rinfo, sizeof rinfo) != 0 && forkNum != r->forknum || blkno != r->blkno)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "[shard %d] get unexpected get page resonse for block %u in rel %u/%u/%u.%u instead of block block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
slot->shard_no,
|
||||
r->blkno, RelFileInfoFmt(r->rinfo), r->forknum,
|
||||
blkno, RelFileInfoFmt(rinfo), forkNum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn)));
|
||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||
break;
|
||||
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
|
||||
2
test_runner/regress/select.sql
Normal file
2
test_runner/regress/select.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
select sum(abalance) from pgbench_accounts;
|
||||
|
||||
@@ -13,14 +13,17 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_pageserver_restarts")
|
||||
endpoint = env.endpoints.create_start("test_pageserver_restarts")
|
||||
n_restarts = 10
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_pageserver_restarts", config_lines=["effective_io_concurrency=100"]
|
||||
)
|
||||
|
||||
n_restarts = 100
|
||||
scale = 10
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-c", "10", "-f", "test_runner/regress/select.sql", f"-T{n_restarts}", connstr])
|
||||
|
||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||
thread.start()
|
||||
|
||||
Reference in New Issue
Block a user