mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 19:10:38 +00:00
Compare commits
28 Commits
conrad/lak
...
min_inflig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fe7f8802a | ||
|
|
7841303df9 | ||
|
|
0e9d3fd2b1 | ||
|
|
b2a87b501f | ||
|
|
06417f2ff9 | ||
|
|
252876515c | ||
|
|
d56a72afec | ||
|
|
3847ab73a7 | ||
|
|
3e5bbe7027 | ||
|
|
546a45f57a | ||
|
|
588cb289d5 | ||
|
|
b947deb07c | ||
|
|
2f455baa73 | ||
|
|
f2e65e1d2c | ||
|
|
df49def453 | ||
|
|
1ef0f71a95 | ||
|
|
96df649858 | ||
|
|
9bfba1b087 | ||
|
|
b41b85f8ec | ||
|
|
32b801ea1c | ||
|
|
89496a32d0 | ||
|
|
9617b8d328 | ||
|
|
a504516b8c | ||
|
|
fe7a4e1ab6 | ||
|
|
40cae8cc36 | ||
|
|
02fc8b7c70 | ||
|
|
60feb168e2 | ||
|
|
da596a5162 |
27
.github/workflows/pg-clients.yml
vendored
27
.github/workflows/pg-clients.yml
vendored
@@ -48,8 +48,20 @@ jobs:
|
||||
uses: ./.github/workflows/build-build-tools-image.yml
|
||||
secrets: inherit
|
||||
|
||||
generate-ch-tmppw:
|
||||
runs-on: ubuntu-22.04
|
||||
outputs:
|
||||
tmp_val: ${{ steps.pwgen.outputs.tmp_val }}
|
||||
steps:
|
||||
- name: Generate a random password
|
||||
id: pwgen
|
||||
run: |
|
||||
set +x
|
||||
p=$(dd if=/dev/random bs=14 count=1 2>/dev/null | base64)
|
||||
echo tmp_val="${p//\//}" >> "${GITHUB_OUTPUT}"
|
||||
|
||||
test-logical-replication:
|
||||
needs: [ build-build-tools-image ]
|
||||
needs: [ build-build-tools-image, generate-ch-tmppw ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
container:
|
||||
@@ -60,16 +72,20 @@ jobs:
|
||||
options: --init --user root
|
||||
services:
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server:24.6.3.64
|
||||
image: clickhouse/clickhouse-server:24.8
|
||||
env:
|
||||
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
|
||||
ports:
|
||||
- 9000:9000
|
||||
- 8123:8123
|
||||
zookeeper:
|
||||
image: quay.io/debezium/zookeeper:2.7
|
||||
image: quay.io/debezium/zookeeper:3.1.3.Final
|
||||
ports:
|
||||
- 2181:2181
|
||||
- 2888:2888
|
||||
- 3888:3888
|
||||
kafka:
|
||||
image: quay.io/debezium/kafka:2.7
|
||||
image: quay.io/debezium/kafka:3.1.3.Final
|
||||
env:
|
||||
ZOOKEEPER_CONNECT: "zookeeper:2181"
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
@@ -79,7 +95,7 @@ jobs:
|
||||
ports:
|
||||
- 9092:9092
|
||||
debezium:
|
||||
image: quay.io/debezium/connect:2.7
|
||||
image: quay.io/debezium/connect:3.1.3.Final
|
||||
env:
|
||||
BOOTSTRAP_SERVERS: kafka:9092
|
||||
GROUP_ID: 1
|
||||
@@ -125,6 +141,7 @@ jobs:
|
||||
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
|
||||
|
||||
- name: Delete Neon Project
|
||||
if: always()
|
||||
|
||||
@@ -52,7 +52,7 @@ pub(crate) fn regenerate(
|
||||
};
|
||||
|
||||
// Express a static value for how many shards we may schedule on one node
|
||||
const MAX_SHARDS: u32 = 5000;
|
||||
const MAX_SHARDS: u32 = 2500;
|
||||
|
||||
let mut doc = PageserverUtilization {
|
||||
disk_usage_bytes: used,
|
||||
|
||||
@@ -48,6 +48,8 @@ DATA = \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.6.sql \
|
||||
neon--1.6--1.7.sql \
|
||||
neon--1.7--1.6.sql \
|
||||
neon--1.6--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
|
||||
@@ -79,10 +79,6 @@
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM < 160000
|
||||
typedef PGAlignedBlock PGIOAlignedBlock;
|
||||
#endif
|
||||
|
||||
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
|
||||
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
|
||||
##__VA_ARGS__)
|
||||
@@ -264,7 +260,7 @@ typedef struct PrefetchState
|
||||
|
||||
/* the buffers */
|
||||
prfh_hash *prf_hash;
|
||||
int max_shard_no;
|
||||
int max_unflushed_shard_no;
|
||||
/* Mark shards involved in prefetch */
|
||||
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
|
||||
PrefetchRequest prf_buffer[]; /* prefetch buffers */
|
||||
@@ -304,6 +300,7 @@ static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_
|
||||
static bool prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup_trailing_unused(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index);
|
||||
static bool prefetch_flush_requests(void);
|
||||
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
@@ -473,13 +470,26 @@ communicator_prefetch_pump_state(void)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
if (MyPState->ring_receive == MyPState->ring_flush && MyPState->ring_flush < MyPState->ring_unused)
|
||||
{
|
||||
/*
|
||||
* Flush request to avoid requests pending for arbitrary long time,
|
||||
* pinning LSN and holding GC at PS.
|
||||
*/
|
||||
if (!prefetch_flush_requests())
|
||||
{
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
return;
|
||||
}
|
||||
}
|
||||
while (MyPState->ring_receive != MyPState->ring_flush)
|
||||
{
|
||||
NeonResponse *response;
|
||||
PrefetchRequest *slot;
|
||||
MemoryContext old;
|
||||
uint64 my_ring_index = MyPState->ring_receive;
|
||||
|
||||
slot = GetPrfSlot(MyPState->ring_receive);
|
||||
slot = GetPrfSlot(my_ring_index);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = page_server->try_receive(slot->shard_no);
|
||||
@@ -493,12 +503,12 @@ communicator_prefetch_pump_state(void)
|
||||
/* The slot should still be valid */
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
slot->my_ring_index != my_ring_index)
|
||||
{
|
||||
neon_shard_log(slot->shard_no, PANIC,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
|
||||
slot->status, slot->response,
|
||||
slot->my_ring_index, MyPState->ring_receive);
|
||||
slot->my_ring_index, my_ring_index);
|
||||
}
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
@@ -526,6 +536,19 @@ communicator_prefetch_pump_state(void)
|
||||
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* Update backend's min in-flight prefetch LSN.
|
||||
*/
|
||||
XLogRecPtr min_backend_prefetch_lsn = last_replay_lsn != InvalidXLogRecPtr ? last_replay_lsn : GetXLogReplayRecPtr(NULL);
|
||||
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
|
||||
{
|
||||
PrefetchRequest* slot = GetPrfSlot(ring_index);
|
||||
min_backend_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_backend_prefetch_lsn);
|
||||
}
|
||||
MIN_BACKEND_REQUEST_LSN = min_backend_prefetch_lsn;
|
||||
}
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
|
||||
@@ -565,7 +588,7 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
newPState->ring_last = newsize;
|
||||
newPState->ring_unused = newsize;
|
||||
newPState->ring_receive = newsize;
|
||||
newPState->max_shard_no = MyPState->max_shard_no;
|
||||
newPState->max_unflushed_shard_no = MyPState->max_unflushed_shard_no;
|
||||
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
|
||||
|
||||
/*
|
||||
@@ -665,6 +688,7 @@ consume_prefetch_responses(void)
|
||||
{
|
||||
if (MyPState->ring_receive < MyPState->ring_unused)
|
||||
prefetch_wait_for(MyPState->ring_unused - 1);
|
||||
|
||||
/*
|
||||
* We know for sure we're not working on any prefetch pages after
|
||||
* this.
|
||||
@@ -694,7 +718,7 @@ prefetch_cleanup_trailing_unused(void)
|
||||
static bool
|
||||
prefetch_flush_requests(void)
|
||||
{
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_unflushed_shard_no; shard_no++)
|
||||
{
|
||||
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
|
||||
{
|
||||
@@ -703,7 +727,8 @@ prefetch_flush_requests(void)
|
||||
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
|
||||
}
|
||||
}
|
||||
MyPState->max_shard_no = 0;
|
||||
MyPState->max_unflushed_shard_no = 0;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -727,7 +752,6 @@ prefetch_wait_for(uint64 ring_index)
|
||||
{
|
||||
if (!prefetch_flush_requests())
|
||||
return false;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
@@ -806,6 +830,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
if (response)
|
||||
{
|
||||
check_getpage_response(slot, response);
|
||||
@@ -1014,11 +1039,16 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
|
||||
if (force_request_lsns)
|
||||
{
|
||||
slot->request_lsns = *force_request_lsns;
|
||||
}
|
||||
else
|
||||
{
|
||||
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum,
|
||||
&slot->request_lsns, 1);
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
}
|
||||
request.hdr.lsn = slot->request_lsns.request_lsn;
|
||||
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
@@ -1037,7 +1067,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
MyPState->n_unused -= 1;
|
||||
MyPState->ring_unused += 1;
|
||||
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
|
||||
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
|
||||
MyPState->max_unflushed_shard_no = Max(slot->shard_no+1, MyPState->max_unflushed_shard_no);
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
@@ -1045,6 +1075,25 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that returned page LSN is consistent with request lsns
|
||||
*/
|
||||
static void
|
||||
check_page_lsn(NeonGetPageResponse* resp)
|
||||
{
|
||||
if (neon_protocol_version < 3) /* no information to check */
|
||||
return;
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
|
||||
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
@@ -1068,7 +1117,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
NeonGetPageResponse* resp;
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
@@ -1101,8 +1150,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
continue;
|
||||
}
|
||||
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
|
||||
resp = (NeonGetPageResponse*)slot->response;
|
||||
check_page_lsn(resp);
|
||||
memcpy(buffers[i], resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
|
||||
@@ -1395,7 +1445,6 @@ Retry:
|
||||
*/
|
||||
goto Retry;
|
||||
}
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
return last_ring_index;
|
||||
@@ -1465,10 +1514,12 @@ page_server_request(void const *req)
|
||||
MyNeonCounters->pageserver_open_requests--;
|
||||
} while (resp == NULL);
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
||||
HOLD_INTERRUPTS();
|
||||
page_server->disconnect(shard_no);
|
||||
@@ -1868,6 +1919,13 @@ nm_to_string(NeonMessage *msg)
|
||||
return s.data;
|
||||
}
|
||||
|
||||
static void
|
||||
reset_min_request_lsn(int code, Datum arg)
|
||||
{
|
||||
if (MyProcNumber != -1)
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* communicator_init() -- Initialize per-backend private state
|
||||
*/
|
||||
@@ -1879,6 +1937,8 @@ communicator_init(void)
|
||||
if (MyPState != NULL)
|
||||
return;
|
||||
|
||||
before_shmem_exit(reset_min_request_lsn, 0);
|
||||
|
||||
/*
|
||||
* Sanity check that theperf counters array is sized correctly. We got
|
||||
* this wrong once, and the formula for max number of backends and aux
|
||||
@@ -1888,7 +1948,7 @@ communicator_init(void)
|
||||
* the check here. That's OK, we don't expect the logic to change in old
|
||||
* releases.
|
||||
*/
|
||||
#if PG_VERSION_NUM>=150000
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
|
||||
elog(ERROR, "MyNeonCounters points past end of array");
|
||||
#endif
|
||||
@@ -2227,6 +2287,7 @@ Retry:
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
|
||||
check_page_lsn(getpage_resp);
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
@@ -2503,12 +2564,30 @@ communicator_reconfigure_timeout_if_needed(void)
|
||||
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
|
||||
readahead_getpage_pull_timeout_ms > 0;
|
||||
|
||||
if (!needs_set && MIN_BACKEND_REQUEST_LSN != InvalidXLogRecPtr)
|
||||
{
|
||||
if (last_replay_lsn == InvalidXLogRecPtr)
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
else
|
||||
needs_set = true; /* Can not reset MIN_BACKEND_REQUEST_LSN now, have to do it later */
|
||||
}
|
||||
if (needs_set != timeout_set)
|
||||
{
|
||||
/* The background writer doens't (shouldn't) read any pages */
|
||||
Assert(!AmBackgroundWriterProcess());
|
||||
/* The checkpointer doens't (shouldn't) read any pages */
|
||||
Assert(!AmCheckpointerProcess());
|
||||
/*
|
||||
* The background writer/checkpointer doens't (shouldn't) read any pages.
|
||||
* And definitely they should not run on replica.
|
||||
* The only case when we can get here is replica promotion.
|
||||
*/
|
||||
if (AmBackgroundWriterProcess() || AmCheckpointerProcess())
|
||||
{
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
if (timeout_set)
|
||||
{
|
||||
disable_timeout(PS_TIMEOUT_ID, false);
|
||||
timeout_set = false;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(PS_TIMEOUT_ID == 0))
|
||||
{
|
||||
@@ -2541,14 +2620,6 @@ communicator_reconfigure_timeout_if_needed(void)
|
||||
static void
|
||||
pagestore_timeout_handler(void)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM <= 14
|
||||
/*
|
||||
* PG14: Setting a repeating timeout is not possible, so we signal here
|
||||
* that the timeout has already been reset, and by telling the system
|
||||
* that system will re-schedule it later if we need to.
|
||||
*/
|
||||
timeout_set = false;
|
||||
#endif
|
||||
timeout_signaled = true;
|
||||
InterruptPending = true;
|
||||
}
|
||||
@@ -2568,6 +2639,14 @@ communicator_processinterrupts(void)
|
||||
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#if PG_MAJORVERSION_NUM <= 14
|
||||
/*
|
||||
* PG14: Setting a repeating timeout is not possible, so we signal here
|
||||
* that the timeout has already been reset, and by telling the system
|
||||
* that system will re-schedule it later if we need to.
|
||||
*/
|
||||
timeout_set = false;
|
||||
#endif
|
||||
timeout_signaled = false;
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
@@ -2577,3 +2656,28 @@ communicator_processinterrupts(void)
|
||||
|
||||
return prev_interrupt_cb();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
|
||||
|
||||
Datum
|
||||
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
|
||||
{
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/* Do not hold GC for primary */
|
||||
PG_RETURN_INT64(UINT64_MAX);
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
|
||||
size_t n_procs = ProcGlobal->allProcCount;
|
||||
for (size_t i = 0; i < n_procs; i++)
|
||||
{
|
||||
if (neon_per_backend_counters_shared[i].min_request_lsn != InvalidXLogRecPtr)
|
||||
{
|
||||
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_request_lsn);
|
||||
}
|
||||
}
|
||||
PG_RETURN_INT64(min_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -635,6 +635,11 @@ lfc_init(void)
|
||||
NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Dump a list of pages that are currently in the LFC
|
||||
*
|
||||
* This is used to get a snapshot that can be used to prewarm the LFC later.
|
||||
*/
|
||||
FileCacheState*
|
||||
lfc_get_state(size_t max_entries)
|
||||
{
|
||||
@@ -2267,4 +2272,3 @@ get_prewarm_info(PG_FUNCTION_ARGS)
|
||||
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
}
|
||||
|
||||
|
||||
3
pgxn/neon/neon--1.6--1.7.sql
Normal file
3
pgxn/neon/neon--1.6--1.7.sql
Normal file
@@ -0,0 +1,3 @@
|
||||
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
|
||||
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
|
||||
LANGUAGE C;
|
||||
1
pgxn/neon/neon--1.7--1.6.sql
Normal file
1
pgxn/neon/neon--1.7--1.6.sql
Normal file
@@ -0,0 +1 @@
|
||||
drop function neon_communicator_min_inflight_request_lsn();
|
||||
@@ -1,7 +1,7 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* neon.c
|
||||
* Main entry point into the neon exension
|
||||
* Main entry point into the neon extension
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@@ -508,7 +508,7 @@ _PG_init(void)
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"neon.disable_logical_replication_subscribers",
|
||||
"Disables incomming logical replication",
|
||||
"Disable incoming logical replication",
|
||||
NULL,
|
||||
&disable_logical_replication_subscribers,
|
||||
false,
|
||||
@@ -567,7 +567,7 @@ _PG_init(void)
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"neon.debug_compare_local",
|
||||
"Debug mode for compaing content of pages in prefetch ring/LFC/PS and local disk",
|
||||
"Debug mode for comparing content of pages in prefetch ring/LFC/PS and local disk",
|
||||
NULL,
|
||||
&debug_compare_local,
|
||||
DEBUG_COMPARE_LOCAL_NONE,
|
||||
@@ -735,7 +735,6 @@ neon_shmem_request_hook(void)
|
||||
static void
|
||||
neon_shmem_startup_hook(void)
|
||||
{
|
||||
/* Initialize */
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
|
||||
@@ -42,7 +42,6 @@ NeonPerfCountersShmemRequest(void)
|
||||
}
|
||||
|
||||
|
||||
|
||||
void
|
||||
NeonPerfCountersShmemInit(void)
|
||||
{
|
||||
|
||||
@@ -154,6 +154,11 @@ typedef struct
|
||||
* Histogram of query execution time.
|
||||
*/
|
||||
QTHistogramData query_time_hist;
|
||||
|
||||
/*
|
||||
* Minimal LSN of in-fligth request requests
|
||||
*/
|
||||
XLogRecPtr min_request_lsn;
|
||||
} neon_per_backend_counters;
|
||||
|
||||
/* Pointer to the shared memory array of neon_per_backend_counters structs */
|
||||
@@ -167,11 +172,13 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
|
||||
*/
|
||||
#define NUM_NEON_PERF_COUNTER_SLOTS (MaxBackends + NUM_AUXILIARY_PROCS)
|
||||
|
||||
#if PG_VERSION_NUM >= 170000
|
||||
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
|
||||
#else
|
||||
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProc->pgprocno])
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Backend-local minimal in-flight request LSN.
|
||||
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
|
||||
*/
|
||||
#define MIN_BACKEND_REQUEST_LSN MyNeonCounters->min_request_lsn
|
||||
|
||||
extern void inc_getpage_wait(uint64 latency);
|
||||
extern void inc_page_cache_read_wait(uint64 latency);
|
||||
|
||||
@@ -9,6 +9,10 @@
|
||||
#include "fmgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef PGAlignedBlock PGIOAlignedBlock;
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId)
|
||||
#else
|
||||
@@ -158,6 +162,10 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
#define AmAutoVacuumWorkerProcess() (IsAutoVacuumWorkerProcess())
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
#define MyProcNumber (MyProc - &ProcGlobal->allProcs[0])
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 15
|
||||
extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
|
||||
extern TimeLineID GetWALInsertionTimeLine(void);
|
||||
|
||||
@@ -243,6 +243,7 @@ extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
extern XLogRecPtr last_replay_lsn;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
|
||||
@@ -72,10 +72,6 @@
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM < 160000
|
||||
typedef PGAlignedBlock PGIOAlignedBlock;
|
||||
#endif
|
||||
|
||||
#include "access/nbtree.h"
|
||||
#include "storage/bufpage.h"
|
||||
#include "access/xlog_internal.h"
|
||||
@@ -100,6 +96,8 @@ typedef enum
|
||||
|
||||
int debug_compare_local;
|
||||
|
||||
XLogRecPtr last_replay_lsn;
|
||||
|
||||
static NRelFileInfo unlogged_build_rel_info;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
@@ -163,7 +161,7 @@ log_newpages_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
page_std);
|
||||
}
|
||||
|
||||
return ProcLastRecPtr;
|
||||
return GetXLogInsertRecPtr();
|
||||
}
|
||||
#endif /* PG_MAJORVERSION_NUM >= 17 */
|
||||
|
||||
@@ -592,6 +590,17 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
/* Request the page at the end of the last fully replayed LSN. */
|
||||
XLogRecPtr replay_lsn = GetXLogReplayRecPtr(NULL);
|
||||
|
||||
if (MIN_BACKEND_REQUEST_LSN == InvalidXLogRecPtr)
|
||||
{
|
||||
/* mark the backend's replay_lsn as "we have a request ongoing", blocking the expiration of any current LSN */
|
||||
MIN_BACKEND_REQUEST_LSN = replay_lsn;
|
||||
/* make sure memory operations are in correct order, even in concurrent systems */
|
||||
pg_memory_barrier();
|
||||
/* get the current LSN to register */
|
||||
replay_lsn = GetXLogReplayRecPtr(NULL);
|
||||
MIN_BACKEND_REQUEST_LSN = replay_lsn;
|
||||
}
|
||||
last_replay_lsn = replay_lsn;
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
neon_request_lsns *result = &output[i];
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "neon.h"
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "pagestore_client.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "storage/smgr.h"
|
||||
@@ -23,10 +24,6 @@
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "miscadmin.h"
|
||||
#endif
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NRelFileInfo rinfo;
|
||||
|
||||
@@ -9,9 +9,10 @@
|
||||
|
||||
```bash
|
||||
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
|
||||
export CLICKHOUSE_PASSWORD=ch_password123
|
||||
|
||||
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml up -d
|
||||
./scripts/pytest -m remote_cluster -k test_clickhouse
|
||||
./scripts/pytest -m remote_cluster -k 'test_clickhouse[release-pg17]'
|
||||
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
|
||||
```
|
||||
|
||||
@@ -21,6 +22,6 @@ docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
|
||||
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
|
||||
|
||||
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml up -d
|
||||
./scripts/pytest -m remote_cluster -k test_debezium
|
||||
./scripts/pytest -m remote_cluster -k 'test_debezium[release-pg17]'
|
||||
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml down
|
||||
```
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
services:
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server
|
||||
image: clickhouse/clickhouse-server:25.6
|
||||
user: "101:101"
|
||||
container_name: clickhouse
|
||||
hostname: clickhouse
|
||||
environment:
|
||||
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-ch_password123}
|
||||
ports:
|
||||
- 127.0.0.1:8123:8123
|
||||
- 127.0.0.1:9000:9000
|
||||
|
||||
@@ -1,18 +1,28 @@
|
||||
services:
|
||||
zookeeper:
|
||||
image: quay.io/debezium/zookeeper:2.7
|
||||
image: quay.io/debezium/zookeeper:3.1.3.Final
|
||||
ports:
|
||||
- 127.0.0.1:2181:2181
|
||||
- 127.0.0.1:2888:2888
|
||||
- 127.0.0.1:3888:3888
|
||||
kafka:
|
||||
image: quay.io/debezium/kafka:2.7
|
||||
image: quay.io/debezium/kafka:3.1.3.Final
|
||||
depends_on: [zookeeper]
|
||||
environment:
|
||||
ZOOKEEPER_CONNECT: "zookeeper:2181"
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
|
||||
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_JMX_PORT: 9991
|
||||
ports:
|
||||
- 127.0.0.1:9092:9092
|
||||
- 9092:9092
|
||||
- 29092:29092
|
||||
debezium:
|
||||
image: quay.io/debezium/connect:2.7
|
||||
image: quay.io/debezium/connect:3.1.3.Final
|
||||
depends_on: [kafka]
|
||||
environment:
|
||||
BOOTSTRAP_SERVERS: kafka:9092
|
||||
GROUP_ID: 1
|
||||
|
||||
@@ -53,8 +53,13 @@ def test_clickhouse(remote_pg: RemotePostgres):
|
||||
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
|
||||
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
|
||||
conn.commit()
|
||||
client = clickhouse_connect.get_client(host=clickhouse_host)
|
||||
if "CLICKHOUSE_PASSWORD" not in os.environ:
|
||||
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
|
||||
client = clickhouse_connect.get_client(
|
||||
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
|
||||
)
|
||||
client.command("SET allow_experimental_database_materialized_postgresql=1")
|
||||
client.command("DROP DATABASE IF EXISTS db1_postgres")
|
||||
client.command(
|
||||
"CREATE DATABASE db1_postgres ENGINE = "
|
||||
f"MaterializedPostgreSQL('{conn_options['host']}', "
|
||||
|
||||
@@ -17,6 +17,7 @@ from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
|
||||
class DebeziumAPI:
|
||||
@@ -101,9 +102,13 @@ def debezium(remote_pg: RemotePostgres):
|
||||
assert len(dbz.list_connectors()) == 1
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
kafka_host = "kafka" if (os.getenv("CI", "false") == "true") else "127.0.0.1"
|
||||
kafka_port = 9092 if (os.getenv("CI", "false") == "true") else 29092
|
||||
log.info("Connecting to Kafka: %s:%s", kafka_host, kafka_port)
|
||||
|
||||
consumer = KafkaConsumer(
|
||||
"dbserver1.inventory.customers",
|
||||
bootstrap_servers=["kafka:9092"],
|
||||
bootstrap_servers=[f"{kafka_host}:{kafka_port}"],
|
||||
auto_offset_reset="earliest",
|
||||
enable_auto_commit=False,
|
||||
)
|
||||
@@ -112,7 +117,7 @@ def debezium(remote_pg: RemotePostgres):
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
|
||||
def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> None:
|
||||
"""
|
||||
Gets the message from Kafka and checks its validity
|
||||
Arguments:
|
||||
@@ -124,6 +129,7 @@ def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
|
||||
after: a dictionary, if not None, the after field from the kafka message must
|
||||
have the same values for the same keys
|
||||
"""
|
||||
log.info("Bootstrap servers: %s", consumer.config["bootstrap_servers"])
|
||||
msg = consumer.poll()
|
||||
assert msg, "Empty message"
|
||||
for val in msg.values():
|
||||
|
||||
@@ -55,7 +55,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.6",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
all_versions = ["1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
all_versions = ["1.7", "1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.6"
|
||||
for idx, begin_version in enumerate(all_versions):
|
||||
for target_version in all_versions[idx + 1 :]:
|
||||
|
||||
@@ -298,15 +298,26 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
|
||||
assert post_detach_samples == set()
|
||||
|
||||
|
||||
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("compaction", ["compaction_enabled", "compaction_disabled"])
|
||||
def test_pageserver_metrics_removed_after_offload(
|
||||
neon_env_builder: NeonEnvBuilder, compaction: str
|
||||
):
|
||||
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_1, _ = env.create_tenant()
|
||||
tenant_1, _ = env.create_tenant(
|
||||
conf={
|
||||
# disable background compaction and GC so that we don't have leftover tasks
|
||||
# after offloading.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
if compaction == "compaction_disabled"
|
||||
else None
|
||||
)
|
||||
|
||||
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
|
||||
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
|
||||
@@ -351,6 +362,23 @@ def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuild
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
|
||||
# We need to wait until all background jobs are finished before we can check the metrics.
|
||||
# There're many of them: compaction, GC, etc.
|
||||
wait_until(
|
||||
lambda: all(
|
||||
sample.value == 0
|
||||
for sample in env.pageserver.http_client()
|
||||
.get_metrics()
|
||||
.query_all("pageserver_background_loop_semaphore_waiting_tasks")
|
||||
)
|
||||
and all(
|
||||
sample.value == 0
|
||||
for sample in env.pageserver.http_client()
|
||||
.get_metrics()
|
||||
.query_all("pageserver_background_loop_semaphore_running_tasks")
|
||||
)
|
||||
)
|
||||
|
||||
post_offload_samples = set(
|
||||
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user