Compare commits

..

9 Commits

Author SHA1 Message Date
Konstantin Knizhnik
5843b50183 Fix mistyping in test comments 2025-03-27 21:06:57 +01:00
Konstantin Knizhnik
24233d9976 Undo changes in pagestore_smgr.c 2025-03-27 21:06:20 +01:00
Konstantin Knizhnik
735ccee5b2 Do not overwrite buffer filled by prefetch_lookup in lfc_readv_select 2025-03-27 14:12:54 +01:00
Konstantin Knizhnik
a729bc98a9 Do not switch connection to non-blocking mode 2025-03-24 21:06:21 +01:00
Konstantin Knizhnik
de0a8d78c2 Fix switch to non-blocking mode 2025-03-24 21:06:21 +01:00
Konstantin Knizhnik
d7f7d33b0e Use non -blocking mode for compute<->PS protocol 2025-03-24 21:06:20 +01:00
Konstantin Knizhnik
cfbe7a0b3f Remove loop from pageserver_try_receive 2025-03-24 21:06:20 +01:00
Konstantin Knizhnik
049e1c508d Rerstrict interval of polling socket state 2025-03-24 21:06:20 +01:00
Arpad Müller
5f3551e405 Add "still waiting for task" for slow shutdowns (#11351)
To help with narrowing down
https://github.com/neondatabase/cloud/issues/26362, we make the case
more noisy where we are wait for the shutdown of a specific task (in the
case of that issue, the `gc_loop`).
2025-03-24 17:29:44 +00:00
5 changed files with 71 additions and 48 deletions

View File

@@ -103,17 +103,12 @@ RUN set -e \
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
&& apt update \
&& apt install -y \
bpftrace \
ca-certificates \
libreadline-dev \
libseccomp-dev \
iproute2 \
lsof \
openssl \
# System postgres for use with client libraries (e.g. in storage controller)
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
screen \
tcpdump \
openssl \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -38,6 +38,7 @@ use std::panic::AssertUnwindSafe;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::FutureExt;
use once_cell::sync::Lazy;
@@ -584,18 +585,25 @@ pub async fn shutdown_tasks(
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for task {} to shut down", task.name);
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
let _ = join_handle.await;
loop {
tokio::select! {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
_ = &mut join_handle => break,
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
}
}
info!("task {} completed", task.name);
}
} else {

View File

@@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -708,10 +708,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
BITMAP_CLR(mask, buf_offset + i);
}
if (n_blocks_to_read == 0)
{
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
blkno += blocks_in_chunk;
@@ -744,6 +747,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry == NULL)
{
/* Pages are not cached */
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
lfc_ctl->misses += blocks_in_chunk;
pgBufferUsage.file_cache.misses += blocks_in_chunk;
LWLockRelease(lfc_lock);
@@ -766,6 +773,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
if (!BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -789,11 +800,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
}
else
{
BITMAP_CLR(mask, buf_offset + i);
iteration_misses++;
}
}
LWLockRelease(lfc_lock);
@@ -801,15 +814,36 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
if (blocks_in_chunk == n_blocks_to_read)
{
lfc_disable("read");
return -1;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
{
lfc_disable("read");
return -1;
}
}
else
{
/* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */
for (int i = 0; i < blocks_in_chunk; i++)
{
if (BITMAP_ISSET(mask, buf_offset + i))
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ);
pgstat_report_wait_end();
if (rc != BLCKSZ)
{
lfc_disable("read");
return -1;
}
}
}
}
}
@@ -1000,12 +1034,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
return false;

View File

@@ -1142,37 +1142,23 @@ pageserver_try_receive(shardno_t shard_no)
NeonResponse *resp;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
/* read response */
int rc;
int rc;
if (shard->state != PS_Connected)
return NULL;
Assert(pageserver_conn);
while (true)
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (PQisBusy(shard->conn))
if (!PQconsumeInput(shard->conn))
{
WaitEvent event;
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
WAIT_EVENT_NEON_PS_READ) != 1
|| (event.events & WL_SOCKET_READABLE) == 0)
{
return NULL;
}
return NULL;
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
}
else
break;
}
if (rc == 0)
return NULL;
else if (rc > 0)

View File

@@ -97,5 +97,5 @@ def test_lfc_prefetch(neon_simple_env: NeonEnv):
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# No redundant prefethc requrests if prefetch results are stored in LFC
# No redundant prefetch requests if prefetch results are stored in LFC
assert prefetch_expired == 0