Compare commits

...

10 Commits

Author SHA1 Message Date
Kosntantin Knizhnik
9f13a34837 Reset prefetch ring state in case of propagated error 2025-06-26 16:13:22 +03:00
Kosntantin Knizhnik
3635e5da7d Remove useless memset 2025-06-26 08:16:42 +03:00
Kosntantin Knizhnik
a645594eff Add mopre tracing of PS connection 2025-06-25 22:09:55 +03:00
Kosntantin Knizhnik
a2fd5dfce8 Add more checks for prefetch ring state 2025-06-25 09:37:36 +03:00
Kosntantin Knizhnik
d9a639e540 Check reqid before storing response in slot 2025-06-24 17:54:56 +03:00
Kosntantin Knizhnik
9a4b4cfe58 Add more trace and create debug image with asserts enabled 2025-06-23 15:46:23 +03:00
Kosntantin Knizhnik
fad1d4fcd9 Allocate error response in top memory context 2025-06-23 08:25:50 +03:00
Kosntantin Knizhnik
3cf8eb5347 Build with casserts 2025-06-22 22:17:26 +03:00
Kosntantin Knizhnik
a0a8f1903c Fix relkind check 2025-06-22 15:42:10 +03:00
Kosntantin Knizhnik
5138f49b08 Store relation kind in LFC chunk 2025-06-22 09:50:54 +03:00
7 changed files with 119 additions and 26 deletions

View File

@@ -17,8 +17,8 @@ BUILD_TYPE ?= debug
WITH_SANITIZERS ?= no
PG_CFLAGS = -fsigned-char
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS += -O2 -g3 $(CFLAGS)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS += -O0 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release

View File

@@ -164,7 +164,7 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION:?} postgres
RUN cd postgres && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3 -fsigned-char' --enable-debug --with-openssl --with-uuid=ossp \
export CONFIGURE_CMD="./configure CFLAGS='-O0 -g3 -fsigned-char' --enable-debug --enable-cassert --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION:?}" != "v14" ]; then \
# zstd is available only from PG15

View File

@@ -412,6 +412,18 @@ compact_prefetch_buffers(void)
return false;
}
static void
dump_prefetch_state(void)
{
neon_log(LOG, "PREFETCH STATE: ring_last=%lx, ring_receive=%lx, ring_flush=%lx, ring_unused=%lx",
MyPState->ring_last, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
for (uint64 i = MyPState->ring_last; i < MyPState->ring_unused; i++)
{
PrefetchRequest *slot = GetPrfSlot(i);
neon_log(LOG, "PREFETCH STATE: slot %lx status=%d, reqid=%lx", i, slot->status, slot->reqid);
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -446,15 +458,24 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -462,6 +483,13 @@ communicator_prefetch_pump_state(void)
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
if (response->reqid != slot->reqid && response->tag != T_NeonErrorResponse)
{
dump_prefetch_state();
ereport(PANIC,
(errmsg(NEON_TAG "[shard %d, reqid %lx] pump state receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
errbacktrace()));
}
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
@@ -722,10 +750,13 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
}
/*
* Copy the request info so that if an error happens and the prefetch
@@ -745,10 +776,28 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(shard_no, ERROR,
{
dump_prefetch_state();
neon_shard_log(shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
{
dump_prefetch_state();
neon_shard_log(shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (response->reqid != slot->reqid)
{
dump_prefetch_state();
ereport(PANIC,
(errmsg(NEON_TAG "[shard %d, reqid %lx] prefetch_read receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
errbacktrace()));
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -1371,7 +1420,7 @@ page_server_request(void const *req)
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
neon_log(PANIC, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
@@ -1388,13 +1437,13 @@ page_server_request(void const *req)
{
PG_TRY();
{
consume_prefetch_responses();
while (!page_server->send(shard_no, (NeonRequest *) req)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
MyNeonCounters->pageserver_open_requests++;
consume_prefetch_responses();
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
}
@@ -1404,6 +1453,7 @@ page_server_request(void const *req)
* Cancellation in this code needs to be handled better at some
* point, but this currently seems fine for now.
*/
prefetch_on_ps_disconnect();
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
@@ -1502,7 +1552,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
return s;
@@ -1654,7 +1704,7 @@ nm_unpack_response(StringInfo s)
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
neon_log(PANIC, "unexpected neon message tag 0x%02x", tag);
break;
}
@@ -2383,12 +2433,11 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
.segno = segno
};
consume_prefetch_responses();
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);

View File

@@ -116,13 +116,20 @@ typedef enum FileCacheBlockState
REQUESTED /* some other backend is waiting for block to be loaded */
} FileCacheBlockState;
typedef enum RelationKind
{
RELKIND_UNKNOWN,
RELKIND_HEAP,
RELKIND_INDEX
} RelationKind;
typedef struct FileCacheEntry
{
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 access_count:30;
uint32 relkind:2;
dlist_node list_node; /* LRU/holes list node */
uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */
} FileCacheEntry;
@@ -491,6 +498,7 @@ lfc_change_limit_hook(int newval, void *extra)
hole->hash = hash;
hole->offset = offset;
hole->access_count = 0;
hole->relkind = RELKIND_UNKNOWN;
CriticalAssert(!found);
dlist_push_tail(&lfc_ctl->holes, &hole->list_node);
@@ -1450,6 +1458,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
}
entry->access_count = 1;
entry->relkind = RELKIND_UNKNOWN;
entry->hash = hash;
lfc_ctl->pinned += 1;
@@ -1644,7 +1653,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint64 generation;
uint32 entry_offset;
int buf_offset = 0;
RelationKind relkind = RELKIND_UNKNOWN;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
@@ -1661,7 +1670,27 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return;
}
generation = lfc_ctl->generation;
for (int i = 0; i < nblocks; i++)
{
Page page = (Page)buffers[i];
if (!PageIsNew(page))
{
RelationKind pagekind = PageGetSpecialSize(page) != 0 ? RELKIND_INDEX : RELKIND_HEAP;
if (relkind == RELKIND_UNKNOWN)
{
relkind = pagekind;
}
else
{
if (relkind != pagekind)
{
ereport(PANIC,
(errmsg("Inconsistent writing %s page %u %u/%u/%u.%u to LFC", pagekind == RELKIND_INDEX ? "index" : "heap", blkno+i, RelFileInfoFmt(rinfo), forkNum),
errbacktrace()));
}
}
}
}
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
@@ -1711,6 +1740,16 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
if (relkind != RELKIND_UNKNOWN)
{
if (entry->relkind != RELKIND_UNKNOWN && entry->relkind != relkind)
{
ereport(PANIC,
(errmsg("Writing unexpected %s page %u %u/%u/%u.%u to LFC", relkind == RELKIND_INDEX ? "index" : "heap", blkno, RelFileInfoFmt(rinfo), forkNum),
errbacktrace()));
}
entry->relkind = relkind;
}
}
else
{
@@ -1725,6 +1764,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
nblocks -= blocks_in_chunk;
continue;
}
entry->relkind = RELKIND_UNKNOWN;
}
entry_offset = entry->offset;

View File

@@ -126,7 +126,7 @@ static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
typedef enum PSConnectionState {
PS_Disconnected, /* no connection yet */
PS_Disconnected = 1, /* no connection yet */
PS_Connecting_Startup, /* connection starting up */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connected, /* connected, pagestream established */
@@ -373,8 +373,9 @@ get_shard_number(BufferTag *tag)
}
static inline void
CLEANUP_AND_DISCONNECT(PageServer *shard)
CLEANUP_AND_DISCONNECT(PageServer *shard)
{
neon_log(LOG, "Cleanup and disconnect shard %d with state %d", (int)(shard - page_servers), shard->state);
if (shard->wes_read)
{
FreeWaitEventSet(shard->wes_read);
@@ -395,7 +396,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
* complete the connection (e.g. due to receiving an earlier cancellation
* during connection start).
* Returns true if successfully connected; false if the connection failed.
*
*
* Throws errors in unrecoverable situations, or when this backend's query
* is canceled.
*/
@@ -404,7 +405,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
PageServer *shard = &page_servers[shard_no];
char connstr[MAX_PAGESERVER_CONNSTRING_SIZE];
neon_log(LOG, "Initiate connect to shard %d state %d", shard_no, shard->state);
/*
* Get the connection string for this shard. If the shard map has been
* updated since we last looked, this will also disconnect any existing
@@ -968,6 +969,7 @@ retry:
static void
pageserver_disconnect(shardno_t shard_no)
{
neon_log(LOG, "pageserver_disconnect shard %d", shard_no);
/*
* If the connection to any pageserver is lost, we throw away the
* whole prefetch queue, even for other pageservers. It should not
@@ -998,7 +1000,7 @@ pageserver_disconnect_shard(shardno_t shard_no)
* to attach wait events to the WaitEventSets.
*/
CLEANUP_AND_DISCONNECT(shard);
neon_log(LOG, "Disconnect shard %d", shard_no);
shard->state = PS_Disconnected;
}
@@ -1354,6 +1356,10 @@ pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
for (int i = 0; i < MAX_SHARDS; i++)
page_servers[i].state = PS_Disconnected;
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -1527,6 +1533,4 @@ pg_init_libpagestore(void)
smgr_init_hook = smgr_init_neon;
dbsize_hook = neon_dbsize;
}
memset(page_servers, 0, sizeof(page_servers));
}

View File

@@ -5,7 +5,7 @@
],
"v16": [
"16.9",
"7a4c0eacaeb9b97416542fa19103061c166460b1"
"85091d9c28958f19f24aee14526735392da11656"
],
"v15": [
"15.13",