Fix pageserver_try_receive (#11096)

## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1741176713523469

The problem is that this function is using `PQgetCopyData(shard->conn,
&resp_buff.data, 1 /* async = true */)`
to try to fetch next message. But this function returns 0 if the whole
message is not present in the buffer.
And input buffer may contain only part of message so result is not
fetched.

## Summary of changes

Use `PQisBusy` + `WaitEventSetWait` to check if data is available and
`PQgetCopyData(shard->conn, &resp_buff.data, 0)` to read whole message
in this case.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2025-03-20 17:21:00 +02:00
committed by GitHub
parent e5aef3747c
commit 3da70abfa5
4 changed files with 51 additions and 7 deletions

View File

@@ -1013,6 +1013,9 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1163,6 +1166,13 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
/*

View File

@@ -1085,8 +1085,29 @@ pageserver_try_receive(shardno_t shard_no)
Assert(pageserver_conn);
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async = true */);
while (true)
{
if (PQisBusy(shard->conn))
{
WaitEvent event;
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
WAIT_EVENT_NEON_PS_READ) != 1
|| (event.events & WL_SOCKET_READABLE) == 0)
{
return NULL;
}
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
}
else
break;
}
if (rc == 0)
return NULL;
else if (rc > 0)

View File

@@ -1040,6 +1040,16 @@ prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, n
continue;
}
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forknum, blocknum + i, buffers[i]);
prefetch_set_unused(ring_index);
BITMAP_SET(mask, i);
@@ -3277,6 +3287,12 @@ Retry:
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forkNum, blockno, buffer);
break;

View File

@@ -19,10 +19,7 @@ def test_lfc_working_set_approximation(neon_simple_env: NeonEnv):
log.info("Creating endpoint with 1MB shared_buffers and 64 MB LFC")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size='128MB'",
"neon.file_cache_size_limit='64MB'",
],
config_lines=["neon.max_file_cache_size='128MB'", "neon.file_cache_size_limit='64MB'"],
)
cur = endpoint.connect().cursor()
@@ -116,4 +113,4 @@ def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
log.info(f"Table size {size} blocks")
assert estimation_1k >= 20 and estimation_1k <= 40
assert estimation_10k >= 200 and estimation_10k <= 400
assert estimation_10k >= 200 and estimation_10k <= 440