Fix neon.flush_output_after GUC.

The neon.flush_output_after was not effective, at least not in a sequential
scan, because neon_read_at_lsn() flushed the prefetch queue on every call
anyway. Change neon_read_at_lsn() so that it only flushes the queue if
the GetPage request that it needs to wait for hasn't alrady been flushed.
To make that possible, move the tracking of unflushed requests into a new
ring_flush variable, alongside the other ring buffer indexes.

While we're at it, mark neon.flush_output_after as PGC_USERSET, so that it
can be changed per-session with "SET neon.flush_output_after = ...". Makes
it easier to test different values.
This commit is contained in:
Heikki Linnakangas
2022-11-08 12:11:17 +02:00
parent c1a76eb0e5
commit c5c4e47edd
3 changed files with 18 additions and 10 deletions

View File

@@ -42,7 +42,6 @@ PGconn *pageserver_conn = NULL;
char *page_server_connstring_raw;
int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
static void pageserver_flush(void);
@@ -205,11 +204,6 @@ pageserver_send(NeonRequest * request)
}
pfree(req_buff.data);
n_unflushed_requests++;
if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests)
pageserver_flush();
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = nm_to_string((NeonMessage *) request);
@@ -274,7 +268,6 @@ pageserver_flush(void)
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);
}
n_unflushed_requests = 0;
}
page_server_api api = {
@@ -436,7 +429,7 @@ pg_init_libpagestore(void)
NULL,
&flush_every_n_requests,
8, -1, INT_MAX,
PGC_SIGHUP,
PGC_USERSET,
0, /* no flags required */
NULL, NULL, NULL);

View File

@@ -150,6 +150,7 @@ extern void prefetch_on_ps_disconnect(void);
extern page_server_api * page_server;
extern char *page_server_connstring;
extern int flush_every_n_requests;
extern bool seqscan_prefetch_enabled;
extern int seqscan_prefetch_distance;
extern char *neon_timeline;

View File

@@ -192,7 +192,7 @@ typedef struct PrfHashEntry {
* It maintains a (ring) buffer of in-flight requests and responses.
*
* We maintain several indexes into the ring buffer:
* ring_unused >= ring_receive >= ring_last >= 0
* ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
*
* ring_unused points to the first unused slot of the buffer
* ring_receive is the next request that is to be received
@@ -208,6 +208,7 @@ typedef struct PrefetchState {
/* buffer indexes */
uint64 ring_unused; /* first unused slot */
uint64 ring_flush; /* next request to flush */
uint64 ring_receive; /* next slot that is to receive a response */
uint64 ring_last; /* min slot with a response value */
@@ -577,6 +578,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
prefetch_do_request(slot, force_latest, force_lsn);
Assert(slot->status == PRFS_REQUESTED);
Assert(ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
return ring_index;
}
@@ -585,6 +593,7 @@ page_server_request(void const *req)
{
page_server->send((NeonRequest *) req);
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses();
return page_server->receive();
}
@@ -1581,6 +1590,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (entry->slot->status == PRFS_REQUESTED)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
prefetch_wait_for(entry->slot->my_ring_index);
}
/* drop caches */
@@ -1606,7 +1616,11 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
Assert(slot->status != PRFS_UNUSED);
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
page_server->flush();
if (ring_index >= MyPState->ring_flush)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
prefetch_wait_for(ring_index);
Assert(slot->status == PRFS_RECEIVED);