mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
fix: Don't lose track of in-progress request if query is cancelled
This commit is contained in:
@@ -472,7 +472,6 @@ void
|
||||
communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blockno, BlockNumber nblocks)
|
||||
{
|
||||
int request_idx;
|
||||
NeonIORequest request = {
|
||||
.tag = NeonIORequest_PrefetchV,
|
||||
.prefetch_v = {
|
||||
@@ -493,15 +492,8 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu
|
||||
if (num_inflight_requests >= MAX_INFLIGHT_ASYNC_REQUESTS)
|
||||
process_inflight_requests();
|
||||
|
||||
request_idx = start_request(&request, &result);
|
||||
if (request_idx == -1)
|
||||
{
|
||||
/* -1 means the request was satisfied immediately. */
|
||||
/* FIXME: check and log errors */
|
||||
return;
|
||||
}
|
||||
inflight_requests[num_inflight_requests] = request_idx;
|
||||
num_inflight_requests++;
|
||||
/* Fire and forget the request */
|
||||
(void) start_request(&request, &result);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -522,7 +514,18 @@ communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
blockno);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Drain all in-flight requests from the queue.
|
||||
*
|
||||
* This is used to drain prefetch requests that have been acknowledged by the
|
||||
* communicator, before we perform a synchronous request. (With Postgres v18
|
||||
* and async IO, managing the in-flight requests will get more complicated,
|
||||
* but this will do for now.)
|
||||
*
|
||||
* We can also have some an in-flight request queued up, if the query is
|
||||
* cancelled while a synchronous request is being processed, in
|
||||
* wait_request_completion().
|
||||
*/
|
||||
static void
|
||||
process_inflight_requests(void)
|
||||
{
|
||||
@@ -557,6 +560,9 @@ perform_request(NeonIORequest *request, struct NeonIOResult *result_p)
|
||||
return;
|
||||
}
|
||||
wait_request_completion(request_idx, result_p);
|
||||
Assert(num_inflight_requests == 1);
|
||||
Assert(inflight_requests[0] == request_idx);
|
||||
num_inflight_requests = 0;
|
||||
}
|
||||
|
||||
static int
|
||||
@@ -564,6 +570,8 @@ start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p)
|
||||
{
|
||||
int request_idx;
|
||||
|
||||
Assert(num_inflight_requests < MAX_INFLIGHT_ASYNC_REQUESTS);
|
||||
|
||||
request_idx = bcomm_start_io_request(my_bs, request, immediate_result_p);
|
||||
if (request_idx == -1)
|
||||
{
|
||||
@@ -571,6 +579,9 @@ start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p)
|
||||
elog(DEBUG4, "communicator request %lu was satisfied immediately", request->rel_exists.request_id);
|
||||
return -1;
|
||||
}
|
||||
inflight_requests[num_inflight_requests] = request_idx;
|
||||
num_inflight_requests++;
|
||||
|
||||
elog(LOG, "started communicator request %lu at slot %d", request->rel_exists.request_id, request_idx);
|
||||
return request_idx;
|
||||
}
|
||||
@@ -600,6 +611,12 @@ wait_request_completion(int request_idx, struct NeonIOResult *result_p)
|
||||
*/
|
||||
long timeout_ms = 1000;
|
||||
|
||||
/*
|
||||
* If the query is cancelled, we will bail out here, and leave the
|
||||
* in-flight request in the request queue. It will be waited for
|
||||
* again and processed when the next request is issued, in
|
||||
* process_inflight_requests().
|
||||
*/
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user