mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
test: fix more flaky due to graceful shutdown (#8787)
Going through the list of recent flaky tests, trying to fix those related to graceful shutdown. - test_forward_compatibility: flush and wait for uploads to avoid graceful shutdown - test_layer_bloating: in the end the endpoint and vanilla are still up => immediate shutdown - test_lagging_sk: pageserver shutdown is not related to the test => immediate shutdown - test_lsn_lease_size: pageserver flushing is not needed => immediate shutdown Additionally: - remove `wait_for_upload` usage from workload fixture Cc: #8708 Fixes: #8710
This commit is contained in:
@@ -61,8 +61,6 @@ from fixtures.pageserver.common_types import IndexPartDump, LayerName, parse_lay
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
@@ -5347,9 +5345,7 @@ def last_flush_lsn_upload(
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
ps_http = pageserver.http_client(auth_token=auth_token)
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True)
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
@@ -5434,9 +5430,5 @@ def generate_uploads_and_deletions(
|
||||
# ensures that the pageserver is in a fully idle state: there will be no more
|
||||
# background ingest, no more uploads pending, and therefore no non-determinism
|
||||
# in subsequent actions like pageserver restarts.
|
||||
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
# Finish uploads
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
|
||||
# Finish all remote writes (including deletions)
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
|
||||
@@ -10,7 +10,7 @@ from fixtures.neon_fixtures import (
|
||||
tenant_get_shards,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
@@ -174,8 +174,9 @@ class Workload:
|
||||
|
||||
if upload:
|
||||
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(
|
||||
tenant_shard_id, self.timeline_id, wait_until_uploaded=True
|
||||
)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
|
||||
@@ -9,14 +9,12 @@ from typing import List, Optional
|
||||
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, flush_ep_to_pageserver
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
|
||||
@@ -122,11 +120,9 @@ def test_create_snapshot(
|
||||
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
|
||||
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
|
||||
env.endpoints.stop_all()
|
||||
for sk in env.safekeepers:
|
||||
|
||||
@@ -18,7 +18,6 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import assert_pageserver_backups_equal, subprocess_capture
|
||||
@@ -144,7 +143,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, timeline, Lsn(end_lsn))
|
||||
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))
|
||||
client.timeline_checkpoint(tenant, timeline, compact=False, wait_until_uploaded=True)
|
||||
|
||||
# Check it worked
|
||||
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant)
|
||||
@@ -290,7 +289,7 @@ def _import(
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, timeline, lsn)
|
||||
wait_for_upload(client, tenant, timeline, lsn)
|
||||
client.timeline_checkpoint(tenant, timeline, compact=False, wait_until_uploaded=True)
|
||||
|
||||
# Check it worked
|
||||
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant, lsn=lsn)
|
||||
|
||||
@@ -1,27 +1,31 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
logical_replication_sync,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
if env.pg_version != PgVersion.V16:
|
||||
def test_layer_bloating(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
if neon_env_builder.pg_version != PgVersion.V16:
|
||||
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||
|
||||
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"compaction_threshold": 99999,
|
||||
"image_creation_threshold": 99999,
|
||||
}
|
||||
)
|
||||
|
||||
timeline = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_statement=all"])
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
@@ -54,7 +58,7 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
# Wait logical replication to sync
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline)
|
||||
time.sleep(10)
|
||||
env.pageserver.http_client().timeline_checkpoint(env.initial_tenant, timeline, compact=False)
|
||||
|
||||
# Check layer file sizes
|
||||
timeline_path = f"{env.pageserver.workdir}/tenants/{env.initial_tenant}/timelines/{timeline}/"
|
||||
@@ -63,3 +67,5 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
if filename.startswith("00000"):
|
||||
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
|
||||
assert os.path.getsize(timeline_path + filename) < 512_000_000
|
||||
|
||||
env.stop(immediate=True)
|
||||
|
||||
@@ -757,6 +757,9 @@ def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path,
|
||||
|
||||
assert_size_approx_equal_for_lease_test(lease_res, ro_branch_res)
|
||||
|
||||
# we are writing a lot, and flushing all of that to disk is not important for this test
|
||||
env.stop(immediate=True)
|
||||
|
||||
|
||||
def insert_with_action(
|
||||
env: NeonEnv,
|
||||
|
||||
@@ -1300,6 +1300,8 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder):
|
||||
# Check that WALs are the same.
|
||||
cmp_sk_wal([sk1, sk2, sk3], tenant_id, timeline_id)
|
||||
|
||||
env.stop(immediate=True)
|
||||
|
||||
|
||||
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
|
||||
# it works without compute at all.
|
||||
|
||||
Reference in New Issue
Block a user