Compare commits

..

1 Commits

Author SHA1 Message Date
Folke Behrens
a7946dffec neon-image: Add debugging tools to image 2025-03-24 19:52:27 +01:00
5 changed files with 47 additions and 70 deletions

View File

@@ -103,12 +103,17 @@ RUN set -e \
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
&& apt update \
&& apt install -y \
bpftrace \
ca-certificates \
libreadline-dev \
libseccomp-dev \
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
iproute2 \
lsof \
openssl \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
screen \
tcpdump \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -38,7 +38,6 @@ use std::panic::AssertUnwindSafe;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::FutureExt;
use once_cell::sync::Lazy;
@@ -585,25 +584,18 @@ pub async fn shutdown_tasks(
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for task {} to shut down", task.name);
loop {
tokio::select! {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
_ = &mut join_handle => break,
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
}
}
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
let _ = join_handle.await;
info!("task {} completed", task.name);
}
} else {

View File

@@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -708,13 +708,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
BITMAP_CLR(mask, buf_offset + i);
}
if (n_blocks_to_read == 0)
{
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
blkno += blocks_in_chunk;
@@ -747,10 +744,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry == NULL)
{
/* Pages are not cached */
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
lfc_ctl->misses += blocks_in_chunk;
pgBufferUsage.file_cache.misses += blocks_in_chunk;
LWLockRelease(lfc_lock);
@@ -773,10 +766,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
if (!BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -800,13 +789,11 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
}
else
{
BITMAP_CLR(mask, buf_offset + i);
iteration_misses++;
}
}
LWLockRelease(lfc_lock);
@@ -814,36 +801,15 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
if (blocks_in_chunk == n_blocks_to_read)
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
{
lfc_disable("read");
return -1;
}
}
else
if (rc != (BLCKSZ * blocks_in_chunk))
{
/* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */
for (int i = 0; i < blocks_in_chunk; i++)
{
if (BITMAP_ISSET(mask, buf_offset + i))
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ);
pgstat_report_wait_end();
if (rc != BLCKSZ)
{
lfc_disable("read");
return -1;
}
}
}
lfc_disable("read");
return -1;
}
}
@@ -1034,12 +1000,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X",
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
return false;

View File

@@ -1142,23 +1142,37 @@ pageserver_try_receive(shardno_t shard_no)
NeonResponse *resp;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
int rc;
/* read response */
int rc;
if (shard->state != PS_Connected)
return NULL;
Assert(pageserver_conn);
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
while (true)
{
if (!PQconsumeInput(shard->conn))
if (PQisBusy(shard->conn))
{
return NULL;
WaitEvent event;
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
WAIT_EVENT_NEON_PS_READ) != 1
|| (event.events & WL_SOCKET_READABLE) == 0)
{
return NULL;
}
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
}
else
break;
}
if (rc == 0)
return NULL;
else if (rc > 0)

View File

@@ -97,5 +97,5 @@ def test_lfc_prefetch(neon_simple_env: NeonEnv):
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# No redundant prefetch requests if prefetch results are stored in LFC
# No redundant prefethc requrests if prefetch results are stored in LFC
assert prefetch_expired == 0