From 3da70abfa5092bacdebed03d6f771dd28334d479 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 20 Mar 2025 17:21:00 +0200 Subject: [PATCH] Fix pageserver_try_receive (#11096) ## Problem See https://neondb.slack.com/archives/C04DGM6SMTM/p1741176713523469 The problem is that this function is using `PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async = true */)` to try to fetch next message. But this function returns 0 if the whole message is not present in the buffer. And input buffer may contain only part of message so result is not fetched. ## Summary of changes Use `PQisBusy` + `WaitEventSetWait` to check if data is available and `PQgetCopyData(shard->conn, &resp_buff.data, 0)` to read whole message in this case. --------- Co-authored-by: Konstantin Knizhnik --- pgxn/neon/file_cache.c | 10 ++++++++ pgxn/neon/libpagestore.c | 25 +++++++++++++++++-- pgxn/neon/pagestore_smgr.c | 16 ++++++++++++ .../test_lfc_working_set_approximation.py | 7 ++---- 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index d855b933ec..e555e069d0 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -1013,6 +1013,9 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); + tag.blockNum = blkno; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + if (found) { state = GET_STATE(entry, chunk_offs); @@ -1163,6 +1166,13 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); + /* Approximate working set for the blocks assumed in this entry */ + for (int i = 0; i < blocks_in_chunk; i++) + { + tag.blockNum = blkno + i; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } + if (found) { /* diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 637281fe4a..cff4de8619 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1085,8 +1085,29 @@ pageserver_try_receive(shardno_t shard_no) Assert(pageserver_conn); - rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async = true */); - + while (true) + { + if (PQisBusy(shard->conn)) + { + WaitEvent event; + if (WaitEventSetWait(shard->wes_read, 0, &event, 1, + WAIT_EVENT_NEON_PS_READ) != 1 + || (event.events & WL_SOCKET_READABLE) == 0) + { + return NULL; + } + } + rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */); + if (rc == 0) + { + if (!PQconsumeInput(shard->conn)) + { + return NULL; + } + } + else + break; + } if (rc == 0) return NULL; else if (rc > 0) diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index d8444bc747..1736165c7d 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1040,6 +1040,16 @@ prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, n continue; } memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ); + + + /* + * With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received + * from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here + * under buffer lock. + */ + if (!lfc_store_prefetch_result) + lfc_write(rinfo, forknum, blocknum + i, buffers[i]); + prefetch_set_unused(ring_index); BITMAP_SET(mask, i); @@ -3277,6 +3287,12 @@ Retry: } } memcpy(buffer, getpage_resp->page, BLCKSZ); + + /* + * With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received + * from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here + * under buffer lock. + */ if (!lfc_store_prefetch_result) lfc_write(rinfo, forkNum, blockno, buffer); break; diff --git a/test_runner/regress/test_lfc_working_set_approximation.py b/test_runner/regress/test_lfc_working_set_approximation.py index 17068849d4..ae0f26c69f 100644 --- a/test_runner/regress/test_lfc_working_set_approximation.py +++ b/test_runner/regress/test_lfc_working_set_approximation.py @@ -19,10 +19,7 @@ def test_lfc_working_set_approximation(neon_simple_env: NeonEnv): log.info("Creating endpoint with 1MB shared_buffers and 64 MB LFC") endpoint = env.endpoints.create_start( "main", - config_lines=[ - "neon.max_file_cache_size='128MB'", - "neon.file_cache_size_limit='64MB'", - ], + config_lines=["neon.max_file_cache_size='128MB'", "neon.file_cache_size_limit='64MB'"], ) cur = endpoint.connect().cursor() @@ -116,4 +113,4 @@ def test_sliding_working_set_approximation(neon_simple_env: NeonEnv): log.info(f"Table size {size} blocks") assert estimation_1k >= 20 and estimation_1k <= 40 - assert estimation_10k >= 200 and estimation_10k <= 400 + assert estimation_10k >= 200 and estimation_10k <= 440