mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-04 19:20:36 +00:00
Compare commits
10 Commits
fix_motd_r
...
lfc_check_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f13a34837 | ||
|
|
3635e5da7d | ||
|
|
a645594eff | ||
|
|
a2fd5dfce8 | ||
|
|
d9a639e540 | ||
|
|
9a4b4cfe58 | ||
|
|
fad1d4fcd9 | ||
|
|
3cf8eb5347 | ||
|
|
a0a8f1903c | ||
|
|
5138f49b08 |
4
Makefile
4
Makefile
@@ -17,8 +17,8 @@ BUILD_TYPE ?= debug
|
||||
WITH_SANITIZERS ?= no
|
||||
PG_CFLAGS = -fsigned-char
|
||||
ifeq ($(BUILD_TYPE),release)
|
||||
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
|
||||
PG_CFLAGS += -O2 -g3 $(CFLAGS)
|
||||
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
|
||||
PG_CFLAGS += -O0 -g3 $(CFLAGS)
|
||||
PG_LDFLAGS = $(LDFLAGS)
|
||||
# Unfortunately, `--profile=...` is a nightly feature
|
||||
CARGO_BUILD_FLAGS += --release
|
||||
|
||||
@@ -164,7 +164,7 @@ FROM build-deps AS pg-build
|
||||
ARG PG_VERSION
|
||||
COPY vendor/postgres-${PG_VERSION:?} postgres
|
||||
RUN cd postgres && \
|
||||
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3 -fsigned-char' --enable-debug --with-openssl --with-uuid=ossp \
|
||||
export CONFIGURE_CMD="./configure CFLAGS='-O0 -g3 -fsigned-char' --enable-debug --enable-cassert --with-openssl --with-uuid=ossp \
|
||||
--with-icu --with-libxml --with-libxslt --with-lz4" && \
|
||||
if [ "${PG_VERSION:?}" != "v14" ]; then \
|
||||
# zstd is available only from PG15
|
||||
|
||||
@@ -412,6 +412,18 @@ compact_prefetch_buffers(void)
|
||||
return false;
|
||||
}
|
||||
|
||||
static void
|
||||
dump_prefetch_state(void)
|
||||
{
|
||||
neon_log(LOG, "PREFETCH STATE: ring_last=%lx, ring_receive=%lx, ring_flush=%lx, ring_unused=%lx",
|
||||
MyPState->ring_last, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
|
||||
for (uint64 i = MyPState->ring_last; i < MyPState->ring_unused; i++)
|
||||
{
|
||||
PrefetchRequest *slot = GetPrfSlot(i);
|
||||
neon_log(LOG, "PREFETCH STATE: slot %lx status=%d, reqid=%lx", i, slot->status, slot->reqid);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If there might be responses still in the TCP buffer, then we should try to
|
||||
* use those, to reduce any TCP backpressure on the OS/PS side.
|
||||
@@ -446,15 +458,24 @@ communicator_prefetch_pump_state(void)
|
||||
if (response == NULL)
|
||||
break;
|
||||
|
||||
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
|
||||
{
|
||||
dump_prefetch_state();
|
||||
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
|
||||
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
|
||||
}
|
||||
|
||||
/* The slot should still be valid */
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
neon_shard_log(slot->shard_no, ERROR,
|
||||
{
|
||||
dump_prefetch_state();
|
||||
neon_shard_log(slot->shard_no, PANIC,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
|
||||
slot->status, slot->response,
|
||||
(long) slot->my_ring_index, (long) MyPState->ring_receive);
|
||||
|
||||
}
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
@@ -462,6 +483,13 @@ communicator_prefetch_pump_state(void)
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
|
||||
if (response->reqid != slot->reqid && response->tag != T_NeonErrorResponse)
|
||||
{
|
||||
dump_prefetch_state();
|
||||
ereport(PANIC,
|
||||
(errmsg(NEON_TAG "[shard %d, reqid %lx] pump state receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
|
||||
errbacktrace()));
|
||||
}
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
@@ -722,10 +750,13 @@ prefetch_read(PrefetchRequest *slot)
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
neon_shard_log(slot->shard_no, ERROR,
|
||||
{
|
||||
dump_prefetch_state();
|
||||
neon_shard_log(slot->shard_no, PANIC,
|
||||
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
|
||||
slot->status, slot->response,
|
||||
(long)slot->my_ring_index, (long)MyPState->ring_receive);
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy the request info so that if an error happens and the prefetch
|
||||
@@ -745,10 +776,28 @@ prefetch_read(PrefetchRequest *slot)
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
neon_shard_log(shard_no, ERROR,
|
||||
{
|
||||
dump_prefetch_state();
|
||||
neon_shard_log(shard_no, PANIC,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
|
||||
slot->status, slot->response,
|
||||
(long) slot->my_ring_index, (long) MyPState->ring_receive);
|
||||
}
|
||||
|
||||
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
|
||||
{
|
||||
dump_prefetch_state();
|
||||
neon_shard_log(shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
|
||||
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
|
||||
}
|
||||
|
||||
if (response->reqid != slot->reqid)
|
||||
{
|
||||
dump_prefetch_state();
|
||||
ereport(PANIC,
|
||||
(errmsg(NEON_TAG "[shard %d, reqid %lx] prefetch_read receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
|
||||
errbacktrace()));
|
||||
}
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
@@ -1371,7 +1420,7 @@ page_server_request(void const *req)
|
||||
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
|
||||
break;
|
||||
default:
|
||||
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
|
||||
neon_log(PANIC, "Unexpected request tag: %d", messageTag(req));
|
||||
}
|
||||
shard_no = get_shard_number(&tag);
|
||||
|
||||
@@ -1388,13 +1437,13 @@ page_server_request(void const *req)
|
||||
{
|
||||
PG_TRY();
|
||||
{
|
||||
consume_prefetch_responses();
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req)
|
||||
|| !page_server->flush(shard_no))
|
||||
{
|
||||
/* do nothing */
|
||||
}
|
||||
MyNeonCounters->pageserver_open_requests++;
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
MyNeonCounters->pageserver_open_requests--;
|
||||
}
|
||||
@@ -1404,6 +1453,7 @@ page_server_request(void const *req)
|
||||
* Cancellation in this code needs to be handled better at some
|
||||
* point, but this currently seems fine for now.
|
||||
*/
|
||||
prefetch_on_ps_disconnect();
|
||||
page_server->disconnect(shard_no);
|
||||
MyNeonCounters->pageserver_open_requests = 0;
|
||||
|
||||
@@ -1502,7 +1552,7 @@ nm_pack_request(NeonRequest *msg)
|
||||
case T_NeonDbSizeResponse:
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
default:
|
||||
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
|
||||
neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag);
|
||||
break;
|
||||
}
|
||||
return s;
|
||||
@@ -1654,7 +1704,7 @@ nm_unpack_response(StringInfo s)
|
||||
case T_NeonDbSizeRequest:
|
||||
case T_NeonGetSlruSegmentRequest:
|
||||
default:
|
||||
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
|
||||
neon_log(PANIC, "unexpected neon message tag 0x%02x", tag);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -2383,12 +2433,11 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
|
||||
.segno = segno
|
||||
};
|
||||
|
||||
consume_prefetch_responses();
|
||||
|
||||
do
|
||||
{
|
||||
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
|
||||
|
||||
consume_prefetch_responses();
|
||||
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
|
||||
|
||||
@@ -116,13 +116,20 @@ typedef enum FileCacheBlockState
|
||||
REQUESTED /* some other backend is waiting for block to be loaded */
|
||||
} FileCacheBlockState;
|
||||
|
||||
typedef enum RelationKind
|
||||
{
|
||||
RELKIND_UNKNOWN,
|
||||
RELKIND_HEAP,
|
||||
RELKIND_INDEX
|
||||
} RelationKind;
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
BufferTag key;
|
||||
uint32 hash;
|
||||
uint32 offset;
|
||||
uint32 access_count;
|
||||
uint32 access_count:30;
|
||||
uint32 relkind:2;
|
||||
dlist_node list_node; /* LRU/holes list node */
|
||||
uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */
|
||||
} FileCacheEntry;
|
||||
@@ -491,6 +498,7 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
hole->hash = hash;
|
||||
hole->offset = offset;
|
||||
hole->access_count = 0;
|
||||
hole->relkind = RELKIND_UNKNOWN;
|
||||
CriticalAssert(!found);
|
||||
dlist_push_tail(&lfc_ctl->holes, &hole->list_node);
|
||||
|
||||
@@ -1450,6 +1458,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
|
||||
}
|
||||
|
||||
entry->access_count = 1;
|
||||
entry->relkind = RELKIND_UNKNOWN;
|
||||
entry->hash = hash;
|
||||
lfc_ctl->pinned += 1;
|
||||
|
||||
@@ -1644,7 +1653,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
int buf_offset = 0;
|
||||
|
||||
RelationKind relkind = RELKIND_UNKNOWN;
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
@@ -1661,7 +1670,27 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
return;
|
||||
}
|
||||
generation = lfc_ctl->generation;
|
||||
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
Page page = (Page)buffers[i];
|
||||
if (!PageIsNew(page))
|
||||
{
|
||||
RelationKind pagekind = PageGetSpecialSize(page) != 0 ? RELKIND_INDEX : RELKIND_HEAP;
|
||||
if (relkind == RELKIND_UNKNOWN)
|
||||
{
|
||||
relkind = pagekind;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (relkind != pagekind)
|
||||
{
|
||||
ereport(PANIC,
|
||||
(errmsg("Inconsistent writing %s page %u %u/%u/%u.%u to LFC", pagekind == RELKIND_INDEX ? "index" : "heap", blkno+i, RelFileInfoFmt(rinfo), forkNum),
|
||||
errbacktrace()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
* For every chunk that has blocks we're interested in, we
|
||||
* 1. get the chunk header
|
||||
@@ -1711,6 +1740,16 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
lfc_ctl->pinned += 1;
|
||||
dlist_delete(&entry->list_node);
|
||||
}
|
||||
if (relkind != RELKIND_UNKNOWN)
|
||||
{
|
||||
if (entry->relkind != RELKIND_UNKNOWN && entry->relkind != relkind)
|
||||
{
|
||||
ereport(PANIC,
|
||||
(errmsg("Writing unexpected %s page %u %u/%u/%u.%u to LFC", relkind == RELKIND_INDEX ? "index" : "heap", blkno, RelFileInfoFmt(rinfo), forkNum),
|
||||
errbacktrace()));
|
||||
}
|
||||
entry->relkind = relkind;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1725,6 +1764,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
nblocks -= blocks_in_chunk;
|
||||
continue;
|
||||
}
|
||||
entry->relkind = RELKIND_UNKNOWN;
|
||||
}
|
||||
|
||||
entry_offset = entry->offset;
|
||||
|
||||
@@ -126,7 +126,7 @@ static PagestoreShmemState *pagestore_shared;
|
||||
static uint64 pagestore_local_counter = 0;
|
||||
|
||||
typedef enum PSConnectionState {
|
||||
PS_Disconnected, /* no connection yet */
|
||||
PS_Disconnected = 1, /* no connection yet */
|
||||
PS_Connecting_Startup, /* connection starting up */
|
||||
PS_Connecting_PageStream, /* negotiating pagestream */
|
||||
PS_Connected, /* connected, pagestream established */
|
||||
@@ -373,8 +373,9 @@ get_shard_number(BufferTag *tag)
|
||||
}
|
||||
|
||||
static inline void
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
{
|
||||
neon_log(LOG, "Cleanup and disconnect shard %d with state %d", (int)(shard - page_servers), shard->state);
|
||||
if (shard->wes_read)
|
||||
{
|
||||
FreeWaitEventSet(shard->wes_read);
|
||||
@@ -395,7 +396,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
* complete the connection (e.g. due to receiving an earlier cancellation
|
||||
* during connection start).
|
||||
* Returns true if successfully connected; false if the connection failed.
|
||||
*
|
||||
*
|
||||
* Throws errors in unrecoverable situations, or when this backend's query
|
||||
* is canceled.
|
||||
*/
|
||||
@@ -404,7 +405,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
char connstr[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
|
||||
neon_log(LOG, "Initiate connect to shard %d state %d", shard_no, shard->state);
|
||||
/*
|
||||
* Get the connection string for this shard. If the shard map has been
|
||||
* updated since we last looked, this will also disconnect any existing
|
||||
@@ -968,6 +969,7 @@ retry:
|
||||
static void
|
||||
pageserver_disconnect(shardno_t shard_no)
|
||||
{
|
||||
neon_log(LOG, "pageserver_disconnect shard %d", shard_no);
|
||||
/*
|
||||
* If the connection to any pageserver is lost, we throw away the
|
||||
* whole prefetch queue, even for other pageservers. It should not
|
||||
@@ -998,7 +1000,7 @@ pageserver_disconnect_shard(shardno_t shard_no)
|
||||
* to attach wait events to the WaitEventSets.
|
||||
*/
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
neon_log(LOG, "Disconnect shard %d", shard_no);
|
||||
shard->state = PS_Disconnected;
|
||||
}
|
||||
|
||||
@@ -1354,6 +1356,10 @@ pg_init_libpagestore(void)
|
||||
{
|
||||
pagestore_prepare_shmem();
|
||||
|
||||
|
||||
for (int i = 0; i < MAX_SHARDS; i++)
|
||||
page_servers[i].state = PS_Disconnected;
|
||||
|
||||
DefineCustomStringVariable("neon.pageserver_connstring",
|
||||
"connection string to the page server",
|
||||
NULL,
|
||||
@@ -1527,6 +1533,4 @@ pg_init_libpagestore(void)
|
||||
smgr_init_hook = smgr_init_neon;
|
||||
dbsize_hook = neon_dbsize;
|
||||
}
|
||||
|
||||
memset(page_servers, 0, sizeof(page_servers));
|
||||
}
|
||||
|
||||
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 7a4c0eacae...85091d9c28
2
vendor/revisions.json
vendored
2
vendor/revisions.json
vendored
@@ -5,7 +5,7 @@
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"7a4c0eacaeb9b97416542fa19103061c166460b1"
|
||||
"85091d9c28958f19f24aee14526735392da11656"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
|
||||
Reference in New Issue
Block a user