From b83d722369f1cb1d9a55ab8d39c36f30b0886ea4 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 21 Aug 2024 19:22:47 +0300 Subject: [PATCH] test: fix more flaky due to graceful shutdown (#8787) Going through the list of recent flaky tests, trying to fix those related to graceful shutdown. - test_forward_compatibility: flush and wait for uploads to avoid graceful shutdown - test_layer_bloating: in the end the endpoint and vanilla are still up => immediate shutdown - test_lagging_sk: pageserver shutdown is not related to the test => immediate shutdown - test_lsn_lease_size: pageserver flushing is not needed => immediate shutdown Additionally: - remove `wait_for_upload` usage from workload fixture Cc: #8708 Fixes: #8710 --- test_runner/fixtures/neon_fixtures.py | 14 +++--------- test_runner/fixtures/workload.py | 7 +++--- test_runner/regress/test_compatibility.py | 12 ++++------ test_runner/regress/test_import.py | 5 ++--- test_runner/regress/test_layer_bloating.py | 26 +++++++++++++--------- test_runner/regress/test_tenant_size.py | 3 +++ test_runner/regress/test_wal_acceptor.py | 2 ++ 7 files changed, 34 insertions(+), 35 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9aa275d343..2bb698f175 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -61,8 +61,6 @@ from fixtures.pageserver.common_types import IndexPartDump, LayerName, parse_lay from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( wait_for_last_record_lsn, - wait_for_upload, - wait_for_upload_queue_empty, ) from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor @@ -5347,9 +5345,7 @@ def last_flush_lsn_upload( for tenant_shard_id, pageserver in shards: ps_http = pageserver.http_client(auth_token=auth_token) wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) - # force a checkpoint to trigger upload - ps_http.timeline_checkpoint(tenant_shard_id, timeline_id) - wait_for_upload(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) + ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True) return last_flush_lsn @@ -5434,9 +5430,5 @@ def generate_uploads_and_deletions( # ensures that the pageserver is in a fully idle state: there will be no more # background ingest, no more uploads pending, and therefore no non-determinism # in subsequent actions like pageserver restarts. - final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) - ps_http.timeline_checkpoint(tenant_id, timeline_id) - # Finish uploads - wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn) - # Finish all remote writes (including deletions) - wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index cc93762175..065a78bf9b 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -10,7 +10,7 @@ from fixtures.neon_fixtures import ( tenant_get_shards, wait_for_last_flush_lsn, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import wait_for_last_record_lsn # neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex # to ensure we don't do that: this enables running lots of Workloads in parallel safely. @@ -174,8 +174,9 @@ class Workload: if upload: # Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload) - ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id) - wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn) + ps_http.timeline_checkpoint( + tenant_shard_id, self.timeline_id, wait_until_uploaded=True + ) log.info(f"Churn: waiting for remote LSN {last_flush_lsn}") else: log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}") diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 30ff40b7df..de27191945 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -9,14 +9,12 @@ from typing import List, Optional import pytest import toml -from fixtures.common_types import Lsn, TenantId, TimelineId +from fixtures.common_types import TenantId, TimelineId from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, flush_ep_to_pageserver from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( timeline_delete_wait_completed, - wait_for_last_record_lsn, - wait_for_upload, ) from fixtures.pg_version import PgVersion from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage @@ -122,11 +120,9 @@ def test_create_snapshot( timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] pageserver_http = env.pageserver.http_client() - lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn) - pageserver_http.timeline_checkpoint(tenant_id, timeline_id) - wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn) + flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id) + pageserver_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) env.endpoints.stop_all() for sk in env.safekeepers: diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 4dae9176b8..4385cfca76 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -18,7 +18,6 @@ from fixtures.neon_fixtures import ( from fixtures.pageserver.utils import ( timeline_delete_wait_completed, wait_for_last_record_lsn, - wait_for_upload, ) from fixtures.remote_storage import RemoteStorageKind from fixtures.utils import assert_pageserver_backups_equal, subprocess_capture @@ -144,7 +143,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build # Wait for data to land in s3 wait_for_last_record_lsn(client, tenant, timeline, Lsn(end_lsn)) - wait_for_upload(client, tenant, timeline, Lsn(end_lsn)) + client.timeline_checkpoint(tenant, timeline, compact=False, wait_until_uploaded=True) # Check it worked endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant) @@ -290,7 +289,7 @@ def _import( # Wait for data to land in s3 wait_for_last_record_lsn(client, tenant, timeline, lsn) - wait_for_upload(client, tenant, timeline, lsn) + client.timeline_checkpoint(tenant, timeline, compact=False, wait_until_uploaded=True) # Check it worked endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant, lsn=lsn) diff --git a/test_runner/regress/test_layer_bloating.py b/test_runner/regress/test_layer_bloating.py index 77dc8a35b5..b8126395fd 100644 --- a/test_runner/regress/test_layer_bloating.py +++ b/test_runner/regress/test_layer_bloating.py @@ -1,27 +1,31 @@ import os -import time import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( - NeonEnv, + NeonEnvBuilder, logical_replication_sync, wait_for_last_flush_lsn, ) from fixtures.pg_version import PgVersion -def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg): - env = neon_simple_env - - if env.pg_version != PgVersion.V16: +def test_layer_bloating(neon_env_builder: NeonEnvBuilder, vanilla_pg): + if neon_env_builder.pg_version != PgVersion.V16: pytest.skip("pg_log_standby_snapshot() function is available only in PG16") - timeline = env.neon_cli.create_branch("test_logical_replication", "empty") - endpoint = env.endpoints.create_start( - "test_logical_replication", config_lines=["log_statement=all"] + env = neon_env_builder.init_start( + initial_tenant_conf={ + "gc_period": "0s", + "compaction_period": "0s", + "compaction_threshold": 99999, + "image_creation_threshold": 99999, + } ) + timeline = env.initial_timeline + endpoint = env.endpoints.create_start("main", config_lines=["log_statement=all"]) + pg_conn = endpoint.connect() cur = pg_conn.cursor() @@ -54,7 +58,7 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg): # Wait logical replication to sync logical_replication_sync(vanilla_pg, endpoint) wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline) - time.sleep(10) + env.pageserver.http_client().timeline_checkpoint(env.initial_tenant, timeline, compact=False) # Check layer file sizes timeline_path = f"{env.pageserver.workdir}/tenants/{env.initial_tenant}/timelines/{timeline}/" @@ -63,3 +67,5 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg): if filename.startswith("00000"): log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}") assert os.path.getsize(timeline_path + filename) < 512_000_000 + + env.stop(immediate=True) diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index b1ade77a14..f872116a1c 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -757,6 +757,9 @@ def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path, assert_size_approx_equal_for_lease_test(lease_res, ro_branch_res) + # we are writing a lot, and flushing all of that to disk is not important for this test + env.stop(immediate=True) + def insert_with_action( env: NeonEnv, diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index bb3b16f3e1..19df834b81 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1300,6 +1300,8 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder): # Check that WALs are the same. cmp_sk_wal([sk1, sk2, sk3], tenant_id, timeline_id) + env.stop(immediate=True) + # Smaller version of test_one_sk_down testing peer recovery in isolation: that # it works without compute at all.