diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 7c84be7d15..bd53855eab 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -65,6 +65,7 @@ #include "port/pg_iovec.h" #include "postmaster/interrupt.h" #include "replication/walsender.h" +#include "storage/ipc.h" #include "utils/timeout.h" #include "bitmap.h" @@ -412,6 +413,47 @@ compact_prefetch_buffers(void) return false; } +/* + * Check that prefetch response matches the slot + */ +static void +check_getpage_response(PrefetchRequest* slot, NeonResponse* resp) +{ + if (resp->tag != T_NeonGetPageResponse && resp->tag != T_NeonErrorResponse) + { + neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld", + resp->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused); + } + if (neon_protocol_version >= 3) + { + NRelFileInfo rinfo = BufTagGetNRelFileInfo(slot->buftag); + if (resp->tag == T_NeonGetPageResponse) + { + NeonGetPageResponse * getpage_resp = (NeonGetPageResponse *)resp; + if (resp->reqid != slot->reqid || + resp->lsn != slot->request_lsns.request_lsn || + resp->not_modified_since != slot->request_lsns.not_modified_since || + !RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) || + getpage_resp->req.forknum != slot->buftag.forkNum || + getpage_resp->req.blkno != slot->buftag.blockNum) + { + NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC, + "Receive unexpected getpage response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, + slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), slot->buftag.forkNum, slot->buftag.blockNum); + } + } + else if (resp->reqid != slot->reqid || + resp->lsn != slot->request_lsns.request_lsn || + resp->not_modified_since != slot->request_lsns.not_modified_since) + { + elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), + slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)); + } + } +} + /* * If there might be responses still in the TCP buffer, then we should try to * use those, to reduce any TCP backpressure on the OS/PS side. @@ -446,15 +488,18 @@ communicator_prefetch_pump_state(void) if (response == NULL) break; + check_getpage_response(slot, response); + /* The slot should still be valid */ if (slot->status != PRFS_REQUESTED || slot->response != NULL || slot->my_ring_index != MyPState->ring_receive) - neon_shard_log(slot->shard_no, ERROR, + { + neon_shard_log(slot->shard_no, PANIC, "Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu", slot->status, slot->response, (long) slot->my_ring_index, (long) MyPState->ring_receive); - + } /* update prefetch state */ MyPState->n_responses_buffered += 1; MyPState->n_requests_inflight -= 1; @@ -593,6 +638,21 @@ readahead_buffer_resize(int newsize, void *extra) } +/* + * Callback to be called on backend exit to ensure correct state of compute-PS communication + * in case of backend cancel + */ +static void +prefetch_on_exit(int code, Datum arg) +{ + if (code != 0) /* do disconnect only on abnormal backend termination */ + { + shardno_t shard_no = DatumGetInt32(arg); + prefetch_on_ps_disconnect(); + page_server->disconnect(shard_no); + } +} + /* * Make sure that there are no responses still in the buffer. @@ -605,6 +665,11 @@ consume_prefetch_responses(void) { if (MyPState->ring_receive < MyPState->ring_unused) prefetch_wait_for(MyPState->ring_unused - 1); + /* + * We know for sure we're not working on any prefetch pages after + * this. + */ + END_PREFETCH_RECEIVE_WORK(); } static void @@ -722,10 +787,12 @@ prefetch_read(PrefetchRequest *slot) if (slot->status != PRFS_REQUESTED || slot->response != NULL || slot->my_ring_index != MyPState->ring_receive) - neon_shard_log(slot->shard_no, ERROR, + { + neon_shard_log(slot->shard_no, PANIC, "Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu", slot->status, slot->response, (long)slot->my_ring_index, (long)MyPState->ring_receive); + } /* * Copy the request info so that if an error happens and the prefetch @@ -741,14 +808,18 @@ prefetch_read(PrefetchRequest *slot) MemoryContextSwitchTo(old); if (response) { + check_getpage_response(slot, response); + /* The slot should still be valid */ if (slot->status != PRFS_REQUESTED || slot->response != NULL || slot->my_ring_index != MyPState->ring_receive) - neon_shard_log(shard_no, ERROR, + { + neon_shard_log(shard_no, PANIC, "Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu", slot->status, slot->response, (long) slot->my_ring_index, (long) MyPState->ring_receive); + } /* update prefetch state */ MyPState->n_responses_buffered += 1; @@ -820,11 +891,10 @@ communicator_prefetch_receive(BufferTag tag) void prefetch_on_ps_disconnect(void) { - bool save_readpage_reentrant_guard = readpage_reentrant_guard; MyPState->ring_flush = MyPState->ring_unused; - /* Prohibit callig of prefetch_pump_state */ - START_PREFETCH_RECEIVE_WORK(); + /* Nothing should cancel disconnect: we should not leave connection in opaque state */ + HOLD_INTERRUPTS(); while (MyPState->ring_receive < MyPState->ring_unused) { @@ -854,9 +924,6 @@ prefetch_on_ps_disconnect(void) MyNeonCounters->getpage_prefetch_discards_total += 1; } - /* Restore guard */ - readpage_reentrant_guard = save_readpage_reentrant_guard; - /* * We can have gone into retry due to network error, so update stats with * the latest available @@ -865,6 +932,8 @@ prefetch_on_ps_disconnect(void) MyPState->n_requests_inflight; MyNeonCounters->getpage_prefetches_buffered = MyPState->n_responses_buffered; + + RESUME_INTERRUPTS(); } /* @@ -1027,16 +1096,11 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe /* * Ignore errors */ - if (slot->response->tag != T_NeonGetPageResponse) + if (slot->response->tag == T_NeonErrorResponse) { - if (slot->response->tag != T_NeonErrorResponse) - { - NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC, - "Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x", - T_NeonGetPageResponse, T_NeonErrorResponse, slot->response->tag); - } continue; } + Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */ memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ); @@ -1351,7 +1415,7 @@ equal_requests(NeonRequest* a, NeonRequest* b) static NeonResponse * page_server_request(void const *req) { - NeonResponse *resp; + NeonResponse *resp = NULL; BufferTag tag = {0}; shardno_t shard_no; @@ -1371,7 +1435,7 @@ page_server_request(void const *req) tag.blockNum = ((NeonGetPageRequest *) req)->blkno; break; default: - neon_log(ERROR, "Unexpected request tag: %d", messageTag(req)); + neon_log(PANIC, "Unexpected request tag: %d", messageTag(req)); } shard_no = get_shard_number(&tag); @@ -1384,9 +1448,12 @@ page_server_request(void const *req) shard_no = 0; } - do + consume_prefetch_responses(); + + PG_TRY(); { - PG_TRY(); + before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + do { while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no)) @@ -1394,30 +1461,24 @@ page_server_request(void const *req) /* do nothing */ } MyNeonCounters->pageserver_open_requests++; - consume_prefetch_responses(); resp = page_server->receive(shard_no); MyNeonCounters->pageserver_open_requests--; - } - PG_CATCH(); - { - /* - * Cancellation in this code needs to be handled better at some - * point, but this currently seems fine for now. - */ - page_server->disconnect(shard_no); - MyNeonCounters->pageserver_open_requests = 0; + } while (resp == NULL); + cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + } + PG_CATCH(); + { + cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + /* Nothing should cancel disconnect: we should not leave connection in opaque state */ + HOLD_INTERRUPTS(); + page_server->disconnect(shard_no); + MyNeonCounters->pageserver_open_requests = 0; + RESUME_INTERRUPTS(); - /* - * We know for sure we're not working on any prefetch pages after - * this. - */ - END_PREFETCH_RECEIVE_WORK(); + PG_RE_THROW(); + } + PG_END_TRY(); - PG_RE_THROW(); - } - PG_END_TRY(); - - } while (resp == NULL); return resp; } @@ -1502,7 +1563,7 @@ nm_pack_request(NeonRequest *msg) case T_NeonDbSizeResponse: case T_NeonGetSlruSegmentResponse: default: - neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag); + neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag); break; } return s; @@ -1654,7 +1715,7 @@ nm_unpack_response(StringInfo s) case T_NeonDbSizeRequest: case T_NeonGetSlruSegmentRequest: default: - neon_log(ERROR, "unexpected neon message tag 0x%02x", tag); + neon_log(PANIC, "unexpected neon message tag 0x%02x", tag); break; } @@ -1983,7 +2044,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r !RelFileInfoEquals(exists_resp->req.rinfo, request.rinfo) || exists_resp->req.forknum != request.forknum) { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum); @@ -2014,7 +2075,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r break; default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); } @@ -2158,6 +2219,7 @@ Retry: Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0); Assert(hashkey.buftag.blockNum == base_blockno + i); + /* We already checked that response match request when storing it in slot */ resp = slot->response; switch (resp->tag) @@ -2165,21 +2227,6 @@ Retry: case T_NeonGetPageResponse: { NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since || - !RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) || - getpage_resp->req.forknum != forkNum || - getpage_resp->req.blkno != base_blockno + i) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i); - } - } memcpy(buffer, getpage_resp->page, BLCKSZ); /* @@ -2192,17 +2239,6 @@ Retry: break; } case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)); - } - } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", @@ -2257,7 +2293,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns * !RelFileInfoEquals(relsize_resp->req.rinfo, request.rinfo) || relsize_resp->req.forknum != forknum) { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum); @@ -2288,7 +2324,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns * break; default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); } @@ -2327,7 +2363,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns) if (!equal_requests(resp, &request.hdr) || dbsize_resp->req.dbNode != dbNode) { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode); @@ -2356,7 +2392,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns) break; default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); } @@ -2372,7 +2408,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re { int n_blocks; shardno_t shard_no = 0; /* All SLRUs are at shard 0 */ - NeonResponse *resp; + NeonResponse *resp = NULL; NeonGetSlruSegmentRequest request; request = (NeonGetSlruSegmentRequest) { @@ -2383,14 +2419,29 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re .segno = segno }; - do + consume_prefetch_responses(); + + PG_TRY(); { - while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no)); + before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + do + { + while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no)); + resp = page_server->receive(shard_no); + } while (resp == NULL); + cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + } + PG_CATCH(); + { + cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + /* Nothing should cancel disconnect: we should not leave connection in opaque state */ + HOLD_INTERRUPTS(); + page_server->disconnect(shard_no); + RESUME_INTERRUPTS(); - consume_prefetch_responses(); - - resp = page_server->receive(shard_no); - } while (resp == NULL); + PG_RE_THROW(); + } + PG_END_TRY(); switch (resp->tag) { @@ -2403,7 +2454,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re slru_resp->req.kind != kind || slru_resp->req.segno != segno) { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno); @@ -2435,7 +2486,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re break; default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, + NEON_PANIC_CONNECTION_STATE(0, PANIC, "Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x", T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag); }