mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Add On-demand WAL Download to logicalfuncs (#7960)
We implemented on-demand WAL download for walsender, but other things that may want to read the WAL from safekeepers don't do that yet. This PR makes it do that by adding the same set of hooks to logicalfuncs. Addresses https://github.com/neondatabase/neon/issues/7959 Also relies on: https://github.com/neondatabase/postgres/pull/438 https://github.com/neondatabase/postgres/pull/437 https://github.com/neondatabase/postgres/pull/436
This commit is contained in:
2
Makefile
2
Makefile
@@ -124,6 +124,8 @@ postgres-%: postgres-configure-% \
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
|
||||
+@echo "Compiling amcheck $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
|
||||
+@echo "Compiling test_decoding $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install
|
||||
|
||||
.PHONY: postgres-clean-%
|
||||
postgres-clean-%:
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "catalog/pg_type.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/procsignal.h"
|
||||
@@ -280,6 +281,7 @@ _PG_init(void)
|
||||
pg_init_libpagestore();
|
||||
pg_init_walproposer();
|
||||
WalSender_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
|
||||
LogicalFuncs_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
|
||||
|
||||
InitLogicalReplicationMonitor();
|
||||
|
||||
|
||||
@@ -24,8 +24,12 @@
|
||||
#include "walproposer.h"
|
||||
|
||||
static NeonWALReader *wal_reader = NULL;
|
||||
|
||||
struct WalSnd;
|
||||
extern struct WalSnd *MyWalSnd;
|
||||
extern XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
|
||||
extern bool GetDonorShmem(XLogRecPtr *donor_lsn);
|
||||
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
|
||||
|
||||
static XLogRecPtr
|
||||
NeonWALReadWaitForWAL(XLogRecPtr loc)
|
||||
@@ -36,7 +40,28 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
return WalSndWaitForWal(loc);
|
||||
// Walsender sends keepalives and stuff, so better use its normal wait
|
||||
if (MyWalSnd != NULL)
|
||||
return WalSndWaitForWal(loc);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
XLogRecPtr flush_ptr;
|
||||
if (!RecoveryInProgress())
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
flush_ptr = GetFlushRecPtr(NULL);
|
||||
#else
|
||||
flush_ptr = GetFlushRecPtr();
|
||||
#endif
|
||||
else
|
||||
flush_ptr = GetXLogReplayRecPtr(NULL);
|
||||
|
||||
if (loc <= flush_ptr)
|
||||
return flush_ptr;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
pg_usleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
|
||||
@@ -221,6 +221,35 @@ def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
|
||||
|
||||
|
||||
def test_ondemand_wal_download_in_replication_slot_funcs(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("init")
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("create table wal_generator (id serial primary key, data text)")
|
||||
cur.execute(
|
||||
"SELECT * FROM pg_create_logical_replication_slot('slotty_mcslotface', 'test_decoding')"
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO wal_generator (data)
|
||||
SELECT repeat('A', 1024) -- Generates a kilobyte of data per row
|
||||
FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of data
|
||||
"""
|
||||
)
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT * FROM pg_logical_slot_peek_binary_changes('slotty_mcslotface', NULL, NULL, 'include-xids', '0')"
|
||||
)
|
||||
|
||||
|
||||
# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
|
||||
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
@@ -247,6 +276,7 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
# Pause the safekeepers so that they can't send WAL (except to pageserver)
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 17e0f5ff4e...4c51945a61
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: c2c3d40534...e22098d86d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: b228f20372...9837db1578
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "b228f20372ebcabfd7946647cb7adbd38bacb14a"],
|
||||
"v15": ["15.7", "c2c3d40534db97d83dd7e185d1971e707fa2f445"],
|
||||
"v14": ["14.12", "17e0f5ff4e1905691aa40e1e08f9b79b14c99652"]
|
||||
"v16": ["16.3", "9837db157837fcf43ef7348be0017d3a2238cd27"],
|
||||
"v15": ["15.7", "e22098d86d6c40276b6bd75c29133a33fb283ab6"],
|
||||
"v14": ["14.12", "4c51945a6167ca06c0169e7a4ca5a8e7ffa3faba"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user