Use a cached WaitEventSet instead of WaitLatchOrSocket.

When we repeatedly wait for the same events, it's faster to create the
event set once and reuse it. While testing with a sequential scan test
case, I saw WaitLatchOrSocket consuming a lot of CPU:

> -   40.52%     0.14%  postgres  postgres           [.] WaitLatchOrSocket
>    - 40.38% WaitLatchOrSocket
>       + 17.83% AddWaitEventToSet
>       + 9.47% close@plt
>       + 8.29% CreateWaitEventSet
>       + 4.57% WaitEventSetWait

This eliminates most of that overhead.
This commit is contained in:
Heikki Linnakangas
2022-11-08 15:35:13 +02:00
parent a726b37bca
commit db7ca18c43

View File

@@ -40,6 +40,14 @@
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
char *page_server_connstring_raw;
int flush_every_n_requests = 8;
@@ -62,6 +70,7 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"),
@@ -77,22 +86,26 @@ pageserver_connect()
neon_log(ERROR, "could not send pagestream command to pageserver");
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
@@ -100,6 +113,7 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
neon_log(ERROR, "could not complete handshake with pageserver: %s",
msg);
@@ -116,33 +130,30 @@ pageserver_connect()
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(PGconn *conn, char **buffer)
call_PQgetCopyData(char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
if (!PQconsumeInput(pageserver_conn))
neon_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
PQerrorMessage(pageserver_conn));
}
goto retry;
@@ -171,6 +182,8 @@ pageserver_disconnect(void)
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
FreeWaitEventSet(pageserver_conn_wes);
}
static void
@@ -222,7 +235,7 @@ pageserver_receive(void)
PG_TRY();
{
/* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.len = call_PQgetCopyData(&resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len < 0)