mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
pageserver: refactor ingest inplace to decouple decoding and handling (#9472)
## Problem WAL ingest couples decoding of special records with their handling (updates to the storage engine mostly). This is a roadblock for our plan to move WAL filtering (and implicitly decoding) to safekeepers since they cannot do writes to the storage engine. ## Summary of changes This PR decouples the decoding of the special WAL records from their application. The changes are done in place and I've done my best to refrain from refactorings and attempted to preserve the original code as much as possible. Related: https://github.com/neondatabase/neon/issues/9335 Epic: https://github.com/neondatabase/neon/issues/9329
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -435,7 +435,9 @@ def test_emergency_relocate_with_branches_slow_replay(
|
||||
|
||||
# This fail point will pause the WAL ingestion on the main branch, after the
|
||||
# the first insert
|
||||
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
|
||||
pageserver_http.configure_failpoints(
|
||||
[("pageserver-wal-ingest-logical-message-sleep", "return(5000)")]
|
||||
)
|
||||
|
||||
# Attach and wait a few seconds to give it time to load the tenants, attach to the
|
||||
# safekeepers, and to stream and ingest the WAL up to the pause-point.
|
||||
@@ -453,11 +455,13 @@ def test_emergency_relocate_with_branches_slow_replay(
|
||||
assert cur.fetchall() == [("before pause",), ("after pause",)]
|
||||
|
||||
# Sanity check that the failpoint was reached
|
||||
env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
|
||||
env.pageserver.assert_log_contains(
|
||||
'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done'
|
||||
)
|
||||
assert time.time() - before_attach_time > 5
|
||||
|
||||
# Clean up
|
||||
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
|
||||
pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))
|
||||
|
||||
|
||||
# Simulate hard crash of pageserver and re-attach a tenant with a branch
|
||||
@@ -581,7 +585,9 @@ def test_emergency_relocate_with_branches_createdb(
|
||||
# bug reproduced easily even without this, as there is always some delay between
|
||||
# loading the timeline and establishing the connection to the safekeeper to stream and
|
||||
# ingest the WAL, but let's make this less dependent on accidental timing.
|
||||
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
|
||||
pageserver_http.configure_failpoints(
|
||||
[("pageserver-wal-ingest-logical-message-sleep", "return(5000)")]
|
||||
)
|
||||
before_attach_time = time.time()
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
@@ -590,8 +596,10 @@ def test_emergency_relocate_with_branches_createdb(
|
||||
assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200
|
||||
|
||||
# Sanity check that the failpoint was reached
|
||||
env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
|
||||
env.pageserver.assert_log_contains(
|
||||
'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done'
|
||||
)
|
||||
assert time.time() - before_attach_time > 5
|
||||
|
||||
# Clean up
|
||||
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
|
||||
pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))
|
||||
|
||||
Reference in New Issue
Block a user