Address review comments

This commit is contained in:
Kosntantin Knizhnik
2025-07-22 18:19:12 +03:00
committed by Konstantin Knizhnik
parent 546a45f57a
commit 3e5bbe7027
4 changed files with 15 additions and 4 deletions

View File

@@ -541,7 +541,7 @@ communicator_prefetch_pump_state(void)
/* /*
* Update backend's min in-flight prefetch LSN. * Update backend's min in-flight prefetch LSN.
*/ */
XLogRecPtr min_backend_prefetch_lsn = GetXLogReplayRecPtr(NULL); XLogRecPtr min_backend_prefetch_lsn = last_replay_lsn != InvalidXLogRecPtr ? last_replay_lsn : GetXLogReplayRecPtr(NULL);
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++) for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
{ {
PrefetchRequest* slot = GetPrfSlot(ring_index); PrefetchRequest* slot = GetPrfSlot(ring_index);
@@ -1039,11 +1039,16 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused); Assert(mySlotNo == MyPState->ring_unused);
if (force_request_lsns) if (force_request_lsns)
{
slot->request_lsns = *force_request_lsns; slot->request_lsns = *force_request_lsns;
}
else else
{
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag), neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum, slot->buftag.blockNum, slot->buftag.forkNum, slot->buftag.blockNum,
&slot->request_lsns, 1); &slot->request_lsns, 1);
last_replay_lsn = InvalidXLogRecPtr;
}
request.hdr.lsn = slot->request_lsns.request_lsn; request.hdr.lsn = slot->request_lsns.request_lsn;
request.hdr.not_modified_since = slot->request_lsns.not_modified_since; request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
@@ -1509,10 +1514,12 @@ page_server_request(void const *req)
MyNeonCounters->pageserver_open_requests--; MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL); } while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
last_replay_lsn = InvalidXLogRecPtr;
} }
PG_CATCH(); PG_CATCH();
{ {
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
last_replay_lsn = InvalidXLogRecPtr;
/* Nothing should cancel disconnect: we should not leave connection in opaque state */ /* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS(); HOLD_INTERRUPTS();
page_server->disconnect(shard_no); page_server->disconnect(shard_no);
@@ -2554,8 +2561,8 @@ communicator_reconfigure_timeout_if_needed(void)
{ {
/* /*
* The background writer/checkpointer doens't (shouldn't) read any pages. * The background writer/checkpointer doens't (shouldn't) read any pages.
* And definitely they should run on replica. * And definitely they should not run on replica.
* The only cae when we can get here is replica promotion. * The only case when we can get here is replica promotion.
*/ */
if (AmBackgroundWriterProcess() || AmCheckpointerProcess()) if (AmBackgroundWriterProcess() || AmCheckpointerProcess())
{ {

View File

@@ -243,6 +243,7 @@ extern char *neon_timeline;
extern char *neon_tenant; extern char *neon_tenant;
extern int32 max_cluster_size; extern int32 max_cluster_size;
extern int neon_protocol_version; extern int neon_protocol_version;
extern XLogRecPtr last_replay_lsn;
extern shardno_t get_shard_number(BufferTag* tag); extern shardno_t get_shard_number(BufferTag* tag);

View File

@@ -96,6 +96,8 @@ typedef enum
int debug_compare_local; int debug_compare_local;
XLogRecPtr last_replay_lsn;
static NRelFileInfo unlogged_build_rel_info; static NRelFileInfo unlogged_build_rel_info;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
@@ -598,6 +600,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
replay_lsn = GetXLogReplayRecPtr(NULL); replay_lsn = GetXLogReplayRecPtr(NULL);
MIN_BACKEND_PREFETCH_LSN = replay_lsn; MIN_BACKEND_PREFETCH_LSN = replay_lsn;
} }
last_replay_lsn = replay_lsn;
for (int i = 0; i < nblocks; i++) for (int i = 0; i < nblocks; i++)
{ {
neon_request_lsns *result = &output[i]; neon_request_lsns *result = &output[i];

View File

@@ -55,7 +55,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# Ensure that the default version is also updated in the neon.control file # Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.6",) assert cur.fetchone() == ("1.6",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE") cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"] all_versions = ["1.7", "1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
current_version = "1.6" current_version = "1.6"
for idx, begin_version in enumerate(all_versions): for idx, begin_version in enumerate(all_versions):
for target_version in all_versions[idx + 1 :]: for target_version in all_versions[idx + 1 :]: