fixup(walreceiver-after-ingest changes): pausable_failpoint too much of a bottleneck

This commit is contained in:
Christian Schwarz
2024-03-23 15:08:56 +00:00
parent 497acb41f2
commit b6ed8e0cb5
3 changed files with 32 additions and 10 deletions

View File

@@ -325,7 +325,17 @@ pub(super) async fn handle_walreceiver_connection(
filtered_records += 1;
}
pausable_failpoint!("walreceiver-after-ingest");
// don't simply use pausable_failpoint here because its spawn_blocking slows
// slows down the tests too much.
fail::fail_point!("walreceiver-after-ingest-blocking");
if let Err(()) = (|| {
fail::fail_point!("walreceiver-after-ingest-pause-activate", |_| {
Err(())
});
Ok(())
})() {
pausable_failpoint!("walreceiver-after-ingest-pause");
}
last_rec_lsn = lsn;

View File

@@ -116,7 +116,7 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# Configure failpoint to slow down walreceiver ingest
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
pscur.execute("failpoints walreceiver-after-ingest=sleep(20)")
pscur.execute("failpoints walreceiver-after-ingest-blocking=sleep(20)")
# FIXME
# Wait for the check thread to start

View File

@@ -931,7 +931,7 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={
"FAILPOINTS": "initial-size-calculation-permit-pause=pause;walreceiver-after-ingest=pause"
"FAILPOINTS": "initial-size-calculation-permit-pause=pause;walreceiver-after-ingest-pause-activate=return(1),walreceiver-after-ingest-pause=pause"
}
)
@@ -953,7 +953,11 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
assert details["current_logical_size_is_accurate"] is True
client.configure_failpoints(
[("initial-size-calculation-permit-pause", "off"), ("walreceiver-after-ingest", "off")]
[
("initial-size-calculation-permit-pause", "off"),
("walreceiver-after-ingest-pause-activate", "off"),
("walreceiver-after-ingest-pause", "off"),
]
)
@@ -983,7 +987,7 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
# pause at logical size calculation, also pause before walreceiver can give feedback so it will give priority to logical size calculation
env.pageserver.start(
extra_env_vars={
"FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest=pause"
"FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest-pause-activate=return(1),walreceiver-after-ingest-pause=pause"
}
)
@@ -1029,7 +1033,11 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
other_is_attaching()
client.configure_failpoints(
[("timeline-calculate-logical-size-pause", "off"), ("walreceiver-after-ingest", "off")]
[
("timeline-calculate-logical-size-pause", "off"),
("walreceiver-after-ingest-pause-activate", "off"),
("walreceiver-after-ingest-pause", "off"),
]
)
@@ -1057,10 +1065,9 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met
env.pageserver.stop()
# pause at logical size calculation, also pause before walreceiver can give feedback so it will give priority to logical size calculation
paused_failpoints = ["timeline-calculate-logical-size-pause", "walreceiver-after-ingest"]
env.pageserver.start(
extra_env_vars={
"FAILPOINTS": ";".join([f"{fp}=pause" for fp in paused_failpoints]),
"FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest-pause-activate=return(1),walreceiver-after-ingest-pause=pause"
}
)
@@ -1113,5 +1120,10 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met
else:
raise RuntimeError(activation_method)
ps_http = env.pageserver.http_client()
ps_http.configure_failpoints([(fp, "off") for fp in paused_failpoints])
client.configure_failpoints(
[
("timeline-calculate-logical-size-pause", "off"),
("walreceiver-after-ingest-pause-activate", "off"),
("walreceiver-after-ingest-pause", "off"),
]
)