Compare commits

...

7 Commits

Author SHA1 Message Date
Konstantin Knizhnik
e21b2eb7dc Fix prefetch_set_unused 2024-03-29 14:58:46 +02:00
Konstantin Knizhnik
1cd71d29ac Fix bug in prefetch cleanup 2024-03-29 14:10:25 +02:00
Konstantin Knizhnik
86762ba313 Fix test 2024-03-28 23:09:33 +02:00
Konstantin Knizhnik
6a42b6b87f Add select.sql 2024-03-28 21:54:34 +02:00
Konstantin Knizhnik
48013845f0 Fix test 2024-03-28 20:58:52 +02:00
Konstantin Knizhnik
40226ce7da Check prefetch termination on PS disconnect 2024-03-28 18:26:00 +02:00
Konstantin Knizhnik
ea2083653c Drop connections with all shards invoplved in prefetch in case of error 2024-03-26 16:40:25 +02:00
7 changed files with 95 additions and 23 deletions

View File

@@ -913,6 +913,8 @@ pub struct PagestreamNblocksResponse {
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub rel: RelTag,
pub blkno: u32,
pub page: Bytes,
}
@@ -1073,6 +1075,11 @@ impl PagestreamBeMessage {
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put_u32(resp.rel.spcnode);
bytes.put_u32(resp.rel.dbnode);
bytes.put_u32(resp.rel.relnode);
bytes.put_u8(resp.rel.forknum);
bytes.put_u32(resp.blkno);
bytes.put(&resp.page[..]);
}
@@ -1114,9 +1121,20 @@ impl PagestreamBeMessage {
Self::Nblocks(PagestreamNblocksResponse { n_blocks })
}
Tag::GetPage => {
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let blkno = buf.read_u32::<BigEndian>()?;
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
buf.read_exact(&mut page)?;
PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
rel,
blkno,
page: page.into(),
})
}
Tag::Error => {
let mut msg = Vec::new();

View File

@@ -1156,6 +1156,8 @@ impl PageServerHandler {
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
rel: req.rel,
blkno: req.blkno,
page,
}))
}

View File

@@ -111,6 +111,7 @@ static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void pageserver_disconnect_shard(shardno_t shard_no);
static bool
PagestoreShmemIsValid(void)
@@ -487,9 +488,31 @@ retry:
return ret;
}
/*
* Reset prefetch and drop connection to the shard.
* It also drops connection to all other shards involved in prefetch.
*/
static void
pageserver_disconnect(shardno_t shard_no)
{
if (page_servers[shard_no].conn)
{
/*
* If the connection to any pageserver is lost, we throw away the
* whole prefetch queue, even for other pageservers. It should not
* cause big problems, because connection loss is supposed to be a
* rare event.
*/
prefetch_on_ps_disconnect();
}
pageserver_disconnect_shard(shard_no);
}
/*
* Disconnect from specified shard
*/
static void
pageserver_disconnect_shard(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -503,14 +526,6 @@ pageserver_disconnect(shardno_t shard_no)
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
/*
* If the connection to any pageserver is lost, we throw away the
* whole prefetch queue, even for other pageservers. It should not
* cause big problems, because connection loss is supposed to be a
* rare event.
*/
prefetch_on_ps_disconnect();
}
if (page_servers[shard_no].wes != NULL)
{
@@ -676,7 +691,8 @@ page_server_api api =
{
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
.receive = pageserver_receive,
.disconnect = pageserver_disconnect_shard
};
static bool

View File

@@ -139,6 +139,9 @@ typedef struct
typedef struct
{
NeonMessageTag tag;
NRelFileInfo rinfo;
ForkNumber forknum;
BlockNumber blkno;
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetPageResponse;
@@ -180,6 +183,7 @@ typedef struct
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
void (*disconnect) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);

View File

@@ -613,6 +613,14 @@ prefetch_on_ps_disconnect(void)
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->my_ring_index == ring_index);
/*
* Drop connection to all shards which have prefetch requests.
* It is not a problem to call disconnect multiple times on the same connection
* because disconnect implementation in libpagestore.c will check if connection
* is alive and do nothing of connection was already dropped.
*/
page_server->disconnect(slot->shard_no);
/* clean up the request */
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight -= 1;
@@ -633,13 +641,12 @@ prefetch_on_ps_disconnect(void)
static inline void
prefetch_set_unused(uint64 ring_index)
{
PrefetchRequest *slot = GetPrfSlot(ring_index);
PrefetchRequest *slot;
if (ring_index < MyPState->ring_last)
return; /* Should already be unused */
Assert(MyPState->ring_unused > ring_index);
slot = GetPrfSlot(ring_index);
if (slot->status == PRFS_UNUSED)
return;
@@ -798,7 +805,8 @@ Retry:
{
if (*force_lsn > slot->effective_request_lsn)
{
prefetch_wait_for(ring_index);
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
@@ -813,7 +821,8 @@ Retry:
{
if (*force_lsn != slot->effective_request_lsn)
{
prefetch_wait_for(ring_index);
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
@@ -879,7 +888,8 @@ Retry:
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == cleanup_index);
prefetch_wait_for(cleanup_index);
if (!prefetch_wait_for(cleanup_index))
goto Retry;
prefetch_set_unused(cleanup_index);
break;
case PRFS_RECEIVED:
@@ -1108,6 +1118,11 @@ nm_unpack_response(StringInfo s)
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
msg_resp->tag = tag;
NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4);
msg_resp->forknum = pq_getmsgbyte(s);
msg_resp->blkno = pq_getmsgint(s, 4);
/* XXX: should be varlena */
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
pq_getmsgend(s);
@@ -2132,6 +2147,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/*
* Try to find prefetched page in the list of received pages.
*/
Retry:
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
if (entry != NULL)
@@ -2153,7 +2169,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
*/
if (slot->status == PRFS_REQUESTED)
{
prefetch_wait_for(slot->my_ring_index);
if (!prefetch_wait_for(slot->my_ring_index))
goto Retry;
}
/* drop caches */
prefetch_set_unused(slot->my_ring_index);
@@ -2200,10 +2217,20 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
switch (resp->tag)
{
case T_NeonGetPageResponse:
memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ);
{
NeonGetPageResponse* r = (NeonGetPageResponse *) resp;
memcpy(buffer, r->page, BLCKSZ);
if (memcmp(&r->rinfo, &rinfo, sizeof rinfo) != 0 && forkNum != r->forknum || blkno != r->blkno)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[shard %d] get unexpected get page resonse for block %u in rel %u/%u/%u.%u instead of block block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
slot->shard_no,
r->blkno, RelFileInfoFmt(r->rinfo), r->forknum,
blkno, RelFileInfoFmt(rinfo), forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn)));
lfc_write(rinfo, forkNum, blkno, buffer);
break;
}
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),

View File

@@ -0,0 +1,2 @@
select sum(abalance) from pgbench_accounts;

View File

@@ -13,14 +13,17 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_pageserver_restarts")
endpoint = env.endpoints.create_start("test_pageserver_restarts")
n_restarts = 10
endpoint = env.endpoints.create_start(
"test_pageserver_restarts", config_lines=["effective_io_concurrency=100"]
)
n_restarts = 100
scale = 10
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
pg_bin.run_capture(["pgbench", "-c", "10", "-f", "test_runner/regress/select.sql", f"-T{n_restarts}", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()