From 74e0d85a04d2ea401cf6ec36d83a579fe5b2849a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sun, 6 Jul 2025 13:04:03 +0300 Subject: [PATCH] fix: Don't lose track of in-progress request if query is cancelled --- pgxn/neon/communicator_new.c | 39 ++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index e28ab00f69..226a55ac01 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -472,7 +472,6 @@ void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, BlockNumber nblocks) { - int request_idx; NeonIORequest request = { .tag = NeonIORequest_PrefetchV, .prefetch_v = { @@ -493,15 +492,8 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu if (num_inflight_requests >= MAX_INFLIGHT_ASYNC_REQUESTS) process_inflight_requests(); - request_idx = start_request(&request, &result); - if (request_idx == -1) - { - /* -1 means the request was satisfied immediately. */ - /* FIXME: check and log errors */ - return; - } - inflight_requests[num_inflight_requests] = request_idx; - num_inflight_requests++; + /* Fire and forget the request */ + (void) start_request(&request, &result); } /* @@ -522,7 +514,18 @@ communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, blockno); } - +/* + * Drain all in-flight requests from the queue. + * + * This is used to drain prefetch requests that have been acknowledged by the + * communicator, before we perform a synchronous request. (With Postgres v18 + * and async IO, managing the in-flight requests will get more complicated, + * but this will do for now.) + * + * We can also have some an in-flight request queued up, if the query is + * cancelled while a synchronous request is being processed, in + * wait_request_completion(). + */ static void process_inflight_requests(void) { @@ -557,6 +560,9 @@ perform_request(NeonIORequest *request, struct NeonIOResult *result_p) return; } wait_request_completion(request_idx, result_p); + Assert(num_inflight_requests == 1); + Assert(inflight_requests[0] == request_idx); + num_inflight_requests = 0; } static int @@ -564,6 +570,8 @@ start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p) { int request_idx; + Assert(num_inflight_requests < MAX_INFLIGHT_ASYNC_REQUESTS); + request_idx = bcomm_start_io_request(my_bs, request, immediate_result_p); if (request_idx == -1) { @@ -571,6 +579,9 @@ start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p) elog(DEBUG4, "communicator request %lu was satisfied immediately", request->rel_exists.request_id); return -1; } + inflight_requests[num_inflight_requests] = request_idx; + num_inflight_requests++; + elog(LOG, "started communicator request %lu at slot %d", request->rel_exists.request_id, request_idx); return request_idx; } @@ -600,6 +611,12 @@ wait_request_completion(int request_idx, struct NeonIOResult *result_p) */ long timeout_ms = 1000; + /* + * If the query is cancelled, we will bail out here, and leave the + * in-flight request in the request queue. It will be waited for + * again and processed when the next request is issued, in + * process_inflight_requests(). + */ CHECK_FOR_INTERRUPTS(); /*