diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 7e0b0e9b25..9b151d2449 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -100,6 +100,7 @@ pub(super) async fn connection_manager_loop_step( // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?; + let mut broker_reset_interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); debug!("Subscribed for broker timeline updates"); loop { @@ -156,7 +157,10 @@ pub(super) async fn connection_manager_loop_step( // Got a new update from the broker broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => { match broker_update { - Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update), + Ok(Some(broker_update)) => { + broker_reset_interval.reset(); + connection_manager_state.register_timeline_update(broker_update); + }, Err(status) => { match status.code() { Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => { @@ -178,6 +182,14 @@ pub(super) async fn connection_manager_loop_step( } }, + _ = broker_reset_interval.tick() => { + if wait_lsn_status.borrow().is_some() { + tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...") + } + + broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?; + }, + new_event = async { // Reminder: this match arm needs to be cancellation-safe. loop { diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 9b564f0a60..6a715c4b93 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -111,6 +111,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( ".*stalling layer flushes for compaction backpressure.*", ".*layer roll waiting for flush due to compaction backpressure.*", ".*BatchSpanProcessor.*", + ".*No broker updates received for a while.*", *( [ r".*your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs.*" diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 920c538069..011c6896bd 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -416,6 +416,8 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder): # timeline creation (uploads). mask it out here to avoid flakyness. del success_result["remote_consistent_lsn_visible"] del repeat_result["remote_consistent_lsn_visible"] + del success_result["walreceiver_status"] + del repeat_result["walreceiver_status"] assert repeat_result == success_result finally: env.pageserver.stop(immediate=True) diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index d281c055b0..72fc58d761 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -13,50 +13,6 @@ if TYPE_CHECKING: from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder -# Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout. -# Ensures that walreceiver does not run without any data inserted and only starts after the insertion. -def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): - # we assert below that the walreceiver is not active before data writes. - # with manually created timelines, it is active. - # FIXME: remove this test once we remove timelines_onto_safekeepers - neon_env_builder.storage_controller_config = { - "timelines_onto_safekeepers": False, - } - - # Trigger WAL wait timeout faster - neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" - env = neon_env_builder.init_start() - env.pageserver.http_client() - - # In this test we force 'Timed out while waiting for WAL record error' while - # fetching basebackup and don't want any retries. - os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1" - - tenant_id, timeline_id = env.create_tenant() - expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive" - env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*") - - try: - trigger_wait_lsn_timeout(env, tenant_id) - except Exception as e: - exception_string = str(e) - assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" - assert "WalReceiver status: Not active" in exception_string, ( - "Walreceiver should not be active before any data writes" - ) - - insert_test_elements(env, tenant_id, start=0, count=1_000) - try: - trigger_wait_lsn_timeout(env, tenant_id) - except Exception as e: - exception_string = str(e) - assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" - assert "WalReceiver status: Not active" not in exception_string, ( - "Should not be inactive anymore after INSERTs are made" - ) - assert "WalReceiver status" in exception_string, "But still should have some other status" - - # Checks that all active safekeepers are shown in pageserver's walreceiver state printed on WAL wait timeout. # Kills one of the safekeepers and ensures that only the active ones are printed in the state. def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):