mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 17:30:38 +00:00
Compare commits
7 Commits
conrad/pro
...
test_recon
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e21b2eb7dc | ||
|
|
1cd71d29ac | ||
|
|
86762ba313 | ||
|
|
6a42b6b87f | ||
|
|
48013845f0 | ||
|
|
40226ce7da | ||
|
|
ea2083653c |
@@ -913,6 +913,8 @@ pub struct PagestreamNblocksResponse {
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PagestreamGetPageResponse {
|
pub struct PagestreamGetPageResponse {
|
||||||
|
pub rel: RelTag,
|
||||||
|
pub blkno: u32,
|
||||||
pub page: Bytes,
|
pub page: Bytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1073,6 +1075,11 @@ impl PagestreamBeMessage {
|
|||||||
|
|
||||||
Self::GetPage(resp) => {
|
Self::GetPage(resp) => {
|
||||||
bytes.put_u8(Tag::GetPage as u8);
|
bytes.put_u8(Tag::GetPage as u8);
|
||||||
|
bytes.put_u32(resp.rel.spcnode);
|
||||||
|
bytes.put_u32(resp.rel.dbnode);
|
||||||
|
bytes.put_u32(resp.rel.relnode);
|
||||||
|
bytes.put_u8(resp.rel.forknum);
|
||||||
|
bytes.put_u32(resp.blkno);
|
||||||
bytes.put(&resp.page[..]);
|
bytes.put(&resp.page[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1114,9 +1121,20 @@ impl PagestreamBeMessage {
|
|||||||
Self::Nblocks(PagestreamNblocksResponse { n_blocks })
|
Self::Nblocks(PagestreamNblocksResponse { n_blocks })
|
||||||
}
|
}
|
||||||
Tag::GetPage => {
|
Tag::GetPage => {
|
||||||
|
let rel = RelTag {
|
||||||
|
spcnode: buf.read_u32::<BigEndian>()?,
|
||||||
|
dbnode: buf.read_u32::<BigEndian>()?,
|
||||||
|
relnode: buf.read_u32::<BigEndian>()?,
|
||||||
|
forknum: buf.read_u8()?,
|
||||||
|
};
|
||||||
|
let blkno = buf.read_u32::<BigEndian>()?;
|
||||||
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
|
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
|
||||||
buf.read_exact(&mut page)?;
|
buf.read_exact(&mut page)?;
|
||||||
PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
|
PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||||
|
rel,
|
||||||
|
blkno,
|
||||||
|
page: page.into(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
Tag::Error => {
|
Tag::Error => {
|
||||||
let mut msg = Vec::new();
|
let mut msg = Vec::new();
|
||||||
|
|||||||
@@ -1156,6 +1156,8 @@ impl PageServerHandler {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||||
|
rel: req.rel,
|
||||||
|
blkno: req.blkno,
|
||||||
page,
|
page,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -111,6 +111,7 @@ static PageServer page_servers[MAX_SHARDS];
|
|||||||
|
|
||||||
static bool pageserver_flush(shardno_t shard_no);
|
static bool pageserver_flush(shardno_t shard_no);
|
||||||
static void pageserver_disconnect(shardno_t shard_no);
|
static void pageserver_disconnect(shardno_t shard_no);
|
||||||
|
static void pageserver_disconnect_shard(shardno_t shard_no);
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
PagestoreShmemIsValid(void)
|
PagestoreShmemIsValid(void)
|
||||||
@@ -487,9 +488,31 @@ retry:
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Reset prefetch and drop connection to the shard.
|
||||||
|
* It also drops connection to all other shards involved in prefetch.
|
||||||
|
*/
|
||||||
static void
|
static void
|
||||||
pageserver_disconnect(shardno_t shard_no)
|
pageserver_disconnect(shardno_t shard_no)
|
||||||
|
{
|
||||||
|
if (page_servers[shard_no].conn)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If the connection to any pageserver is lost, we throw away the
|
||||||
|
* whole prefetch queue, even for other pageservers. It should not
|
||||||
|
* cause big problems, because connection loss is supposed to be a
|
||||||
|
* rare event.
|
||||||
|
*/
|
||||||
|
prefetch_on_ps_disconnect();
|
||||||
|
}
|
||||||
|
pageserver_disconnect_shard(shard_no);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Disconnect from specified shard
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
pageserver_disconnect_shard(shardno_t shard_no)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* If anything goes wrong while we were sending a request, it's not clear
|
* If anything goes wrong while we were sending a request, it's not clear
|
||||||
@@ -503,14 +526,6 @@ pageserver_disconnect(shardno_t shard_no)
|
|||||||
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
|
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
|
||||||
PQfinish(page_servers[shard_no].conn);
|
PQfinish(page_servers[shard_no].conn);
|
||||||
page_servers[shard_no].conn = NULL;
|
page_servers[shard_no].conn = NULL;
|
||||||
|
|
||||||
/*
|
|
||||||
* If the connection to any pageserver is lost, we throw away the
|
|
||||||
* whole prefetch queue, even for other pageservers. It should not
|
|
||||||
* cause big problems, because connection loss is supposed to be a
|
|
||||||
* rare event.
|
|
||||||
*/
|
|
||||||
prefetch_on_ps_disconnect();
|
|
||||||
}
|
}
|
||||||
if (page_servers[shard_no].wes != NULL)
|
if (page_servers[shard_no].wes != NULL)
|
||||||
{
|
{
|
||||||
@@ -676,7 +691,8 @@ page_server_api api =
|
|||||||
{
|
{
|
||||||
.send = pageserver_send,
|
.send = pageserver_send,
|
||||||
.flush = pageserver_flush,
|
.flush = pageserver_flush,
|
||||||
.receive = pageserver_receive
|
.receive = pageserver_receive,
|
||||||
|
.disconnect = pageserver_disconnect_shard
|
||||||
};
|
};
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
|
|||||||
@@ -139,6 +139,9 @@ typedef struct
|
|||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
NeonMessageTag tag;
|
NeonMessageTag tag;
|
||||||
|
NRelFileInfo rinfo;
|
||||||
|
ForkNumber forknum;
|
||||||
|
BlockNumber blkno;
|
||||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||||
} NeonGetPageResponse;
|
} NeonGetPageResponse;
|
||||||
|
|
||||||
@@ -180,6 +183,7 @@ typedef struct
|
|||||||
bool (*send) (shardno_t shard_no, NeonRequest * request);
|
bool (*send) (shardno_t shard_no, NeonRequest * request);
|
||||||
NeonResponse *(*receive) (shardno_t shard_no);
|
NeonResponse *(*receive) (shardno_t shard_no);
|
||||||
bool (*flush) (shardno_t shard_no);
|
bool (*flush) (shardno_t shard_no);
|
||||||
|
void (*disconnect) (shardno_t shard_no);
|
||||||
} page_server_api;
|
} page_server_api;
|
||||||
|
|
||||||
extern void prefetch_on_ps_disconnect(void);
|
extern void prefetch_on_ps_disconnect(void);
|
||||||
|
|||||||
@@ -613,6 +613,14 @@ prefetch_on_ps_disconnect(void)
|
|||||||
Assert(slot->status == PRFS_REQUESTED);
|
Assert(slot->status == PRFS_REQUESTED);
|
||||||
Assert(slot->my_ring_index == ring_index);
|
Assert(slot->my_ring_index == ring_index);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Drop connection to all shards which have prefetch requests.
|
||||||
|
* It is not a problem to call disconnect multiple times on the same connection
|
||||||
|
* because disconnect implementation in libpagestore.c will check if connection
|
||||||
|
* is alive and do nothing of connection was already dropped.
|
||||||
|
*/
|
||||||
|
page_server->disconnect(slot->shard_no);
|
||||||
|
|
||||||
/* clean up the request */
|
/* clean up the request */
|
||||||
slot->status = PRFS_TAG_REMAINS;
|
slot->status = PRFS_TAG_REMAINS;
|
||||||
MyPState->n_requests_inflight -= 1;
|
MyPState->n_requests_inflight -= 1;
|
||||||
@@ -633,13 +641,12 @@ prefetch_on_ps_disconnect(void)
|
|||||||
static inline void
|
static inline void
|
||||||
prefetch_set_unused(uint64 ring_index)
|
prefetch_set_unused(uint64 ring_index)
|
||||||
{
|
{
|
||||||
PrefetchRequest *slot = GetPrfSlot(ring_index);
|
PrefetchRequest *slot;
|
||||||
|
|
||||||
if (ring_index < MyPState->ring_last)
|
if (ring_index < MyPState->ring_last)
|
||||||
return; /* Should already be unused */
|
return; /* Should already be unused */
|
||||||
|
|
||||||
Assert(MyPState->ring_unused > ring_index);
|
slot = GetPrfSlot(ring_index);
|
||||||
|
|
||||||
if (slot->status == PRFS_UNUSED)
|
if (slot->status == PRFS_UNUSED)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@@ -798,7 +805,8 @@ Retry:
|
|||||||
{
|
{
|
||||||
if (*force_lsn > slot->effective_request_lsn)
|
if (*force_lsn > slot->effective_request_lsn)
|
||||||
{
|
{
|
||||||
prefetch_wait_for(ring_index);
|
if (!prefetch_wait_for(ring_index))
|
||||||
|
goto Retry;
|
||||||
prefetch_set_unused(ring_index);
|
prefetch_set_unused(ring_index);
|
||||||
entry = NULL;
|
entry = NULL;
|
||||||
}
|
}
|
||||||
@@ -813,7 +821,8 @@ Retry:
|
|||||||
{
|
{
|
||||||
if (*force_lsn != slot->effective_request_lsn)
|
if (*force_lsn != slot->effective_request_lsn)
|
||||||
{
|
{
|
||||||
prefetch_wait_for(ring_index);
|
if (!prefetch_wait_for(ring_index))
|
||||||
|
goto Retry;
|
||||||
prefetch_set_unused(ring_index);
|
prefetch_set_unused(ring_index);
|
||||||
entry = NULL;
|
entry = NULL;
|
||||||
}
|
}
|
||||||
@@ -879,7 +888,8 @@ Retry:
|
|||||||
{
|
{
|
||||||
case PRFS_REQUESTED:
|
case PRFS_REQUESTED:
|
||||||
Assert(MyPState->ring_receive == cleanup_index);
|
Assert(MyPState->ring_receive == cleanup_index);
|
||||||
prefetch_wait_for(cleanup_index);
|
if (!prefetch_wait_for(cleanup_index))
|
||||||
|
goto Retry;
|
||||||
prefetch_set_unused(cleanup_index);
|
prefetch_set_unused(cleanup_index);
|
||||||
break;
|
break;
|
||||||
case PRFS_RECEIVED:
|
case PRFS_RECEIVED:
|
||||||
@@ -1108,6 +1118,11 @@ nm_unpack_response(StringInfo s)
|
|||||||
|
|
||||||
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
|
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
|
||||||
msg_resp->tag = tag;
|
msg_resp->tag = tag;
|
||||||
|
NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||||
|
NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||||
|
NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||||
|
msg_resp->forknum = pq_getmsgbyte(s);
|
||||||
|
msg_resp->blkno = pq_getmsgint(s, 4);
|
||||||
/* XXX: should be varlena */
|
/* XXX: should be varlena */
|
||||||
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
|
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
|
||||||
pq_getmsgend(s);
|
pq_getmsgend(s);
|
||||||
@@ -2132,6 +2147,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
/*
|
/*
|
||||||
* Try to find prefetched page in the list of received pages.
|
* Try to find prefetched page in the list of received pages.
|
||||||
*/
|
*/
|
||||||
|
Retry:
|
||||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
||||||
|
|
||||||
if (entry != NULL)
|
if (entry != NULL)
|
||||||
@@ -2153,7 +2169,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
*/
|
*/
|
||||||
if (slot->status == PRFS_REQUESTED)
|
if (slot->status == PRFS_REQUESTED)
|
||||||
{
|
{
|
||||||
prefetch_wait_for(slot->my_ring_index);
|
if (!prefetch_wait_for(slot->my_ring_index))
|
||||||
|
goto Retry;
|
||||||
}
|
}
|
||||||
/* drop caches */
|
/* drop caches */
|
||||||
prefetch_set_unused(slot->my_ring_index);
|
prefetch_set_unused(slot->my_ring_index);
|
||||||
@@ -2200,10 +2217,20 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
switch (resp->tag)
|
switch (resp->tag)
|
||||||
{
|
{
|
||||||
case T_NeonGetPageResponse:
|
case T_NeonGetPageResponse:
|
||||||
memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ);
|
{
|
||||||
|
NeonGetPageResponse* r = (NeonGetPageResponse *) resp;
|
||||||
|
memcpy(buffer, r->page, BLCKSZ);
|
||||||
|
if (memcmp(&r->rinfo, &rinfo, sizeof rinfo) != 0 && forkNum != r->forknum || blkno != r->blkno)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_IO_ERROR),
|
||||||
|
errmsg(NEON_TAG "[shard %d] get unexpected get page resonse for block %u in rel %u/%u/%u.%u instead of block block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||||
|
slot->shard_no,
|
||||||
|
r->blkno, RelFileInfoFmt(r->rinfo), r->forknum,
|
||||||
|
blkno, RelFileInfoFmt(rinfo), forkNum,
|
||||||
|
(uint32) (request_lsn >> 32), (uint32) request_lsn)));
|
||||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case T_NeonErrorResponse:
|
case T_NeonErrorResponse:
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_IO_ERROR),
|
(errcode(ERRCODE_IO_ERROR),
|
||||||
|
|||||||
2
test_runner/regress/select.sql
Normal file
2
test_runner/regress/select.sql
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
select sum(abalance) from pgbench_accounts;
|
||||||
|
|
||||||
@@ -13,14 +13,17 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
|
|||||||
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||||
env = neon_simple_env
|
env = neon_simple_env
|
||||||
env.neon_cli.create_branch("test_pageserver_restarts")
|
env.neon_cli.create_branch("test_pageserver_restarts")
|
||||||
endpoint = env.endpoints.create_start("test_pageserver_restarts")
|
endpoint = env.endpoints.create_start(
|
||||||
n_restarts = 10
|
"test_pageserver_restarts", config_lines=["effective_io_concurrency=100"]
|
||||||
|
)
|
||||||
|
|
||||||
|
n_restarts = 100
|
||||||
scale = 10
|
scale = 10
|
||||||
|
|
||||||
def run_pgbench(connstr: str):
|
def run_pgbench(connstr: str):
|
||||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||||
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
|
pg_bin.run_capture(["pgbench", "-c", "10", "-f", "test_runner/regress/select.sql", f"-T{n_restarts}", connstr])
|
||||||
|
|
||||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|||||||
Reference in New Issue
Block a user