Compare commits

...

8 Commits

Author SHA1 Message Date
Konstantin Knizhnik
5843b50183 Fix mistyping in test comments 2025-03-27 21:06:57 +01:00
Konstantin Knizhnik
24233d9976 Undo changes in pagestore_smgr.c 2025-03-27 21:06:20 +01:00
Konstantin Knizhnik
735ccee5b2 Do not overwrite buffer filled by prefetch_lookup in lfc_readv_select 2025-03-27 14:12:54 +01:00
Konstantin Knizhnik
a729bc98a9 Do not switch connection to non-blocking mode 2025-03-24 21:06:21 +01:00
Konstantin Knizhnik
de0a8d78c2 Fix switch to non-blocking mode 2025-03-24 21:06:21 +01:00
Konstantin Knizhnik
d7f7d33b0e Use non -blocking mode for compute<->PS protocol 2025-03-24 21:06:20 +01:00
Konstantin Knizhnik
cfbe7a0b3f Remove loop from pageserver_try_receive 2025-03-24 21:06:20 +01:00
Konstantin Knizhnik
049e1c508d Rerstrict interval of polling socket state 2025-03-24 21:06:20 +01:00
3 changed files with 54 additions and 34 deletions

View File

@@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -708,10 +708,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
BITMAP_CLR(mask, buf_offset + i);
}
if (n_blocks_to_read == 0)
{
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
blkno += blocks_in_chunk;
@@ -744,6 +747,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry == NULL)
{
/* Pages are not cached */
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
lfc_ctl->misses += blocks_in_chunk;
pgBufferUsage.file_cache.misses += blocks_in_chunk;
LWLockRelease(lfc_lock);
@@ -766,6 +773,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
if (!BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -789,11 +800,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
}
else
{
BITMAP_CLR(mask, buf_offset + i);
iteration_misses++;
}
}
LWLockRelease(lfc_lock);
@@ -801,15 +814,36 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
if (blocks_in_chunk == n_blocks_to_read)
{
lfc_disable("read");
return -1;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
{
lfc_disable("read");
return -1;
}
}
else
{
/* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */
for (int i = 0; i < blocks_in_chunk; i++)
{
if (BITMAP_ISSET(mask, buf_offset + i))
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ);
pgstat_report_wait_end();
if (rc != BLCKSZ)
{
lfc_disable("read");
return -1;
}
}
}
}
}
@@ -1000,12 +1034,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
return false;

View File

@@ -1142,37 +1142,23 @@ pageserver_try_receive(shardno_t shard_no)
NeonResponse *resp;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
/* read response */
int rc;
int rc;
if (shard->state != PS_Connected)
return NULL;
Assert(pageserver_conn);
while (true)
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (PQisBusy(shard->conn))
if (!PQconsumeInput(shard->conn))
{
WaitEvent event;
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
WAIT_EVENT_NEON_PS_READ) != 1
|| (event.events & WL_SOCKET_READABLE) == 0)
{
return NULL;
}
return NULL;
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
}
else
break;
}
if (rc == 0)
return NULL;
else if (rc > 0)

View File

@@ -97,5 +97,5 @@ def test_lfc_prefetch(neon_simple_env: NeonEnv):
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# No redundant prefethc requrests if prefetch results are stored in LFC
# No redundant prefetch requests if prefetch results are stored in LFC
assert prefetch_expired == 0