Fix wait_for_last_record_lsn() and wait_for_upload() python functions.

The contract for wait_for() was not very clear. It waits until the
given function returns successfully, without an exception, but the
wait_for_last_record_lsn() and wait_for_upload() functions used "a <
b" as the condition, i.e. they thought that wait_for() would poll
until the function returns true.

Inline the logic from wait_for() into those two functions, it's not
that complicated, and you get a more specific error message too, if it
fails. Also add a comment to wait_for() to make it more clear how it
works.

Also change remote_consistent_lsn() to return 0 instead of raising an
exception, if remote is None. That can happen if nothing has been
uploaded to remote storage for the timeline yet. It happened once in
the CI, and I was able to reproduce that locally too by adding a sleep
to the storage sync thread, to delay the first upload.
This commit is contained in:
Heikki Linnakangas
2022-05-17 16:21:13 +03:00
committed by Heikki Linnakangas
parent 070c255522
commit f03779bf1a
3 changed files with 44 additions and 15 deletions

View File

@@ -6,7 +6,7 @@ from contextlib import closing
from pathlib import Path
import time
from uuid import UUID
from fixtures.zenith_fixtures import ZenithEnvBuilder, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload
from fixtures.zenith_fixtures import ZenithEnvBuilder, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
from fixtures.utils import lsn_from_hex, lsn_to_hex
import pytest
@@ -109,9 +109,9 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
log.info("waiting for timeline redownload")
wait_for(number_of_iterations=10,
interval=1,
func=lambda: assert_local(client, UUID(tenant_id), UUID(timeline_id)))
wait_until(number_of_iterations=10,
interval=1,
func=lambda: assert_local(client, UUID(tenant_id), UUID(timeline_id)))
detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
assert detail['local'] is not None

View File

@@ -10,7 +10,7 @@ from typing import Optional
import signal
import pytest
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, Etcd, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, Etcd, ZenithPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir
from fixtures.utils import lsn_from_hex
@@ -191,7 +191,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_for(
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))

View File

@@ -34,7 +34,12 @@ from typing_extensions import Literal
import requests
import backoff # type: ignore
from .utils import (etcd_path, get_self_dir, mkdir_if_needed, subprocess_capture, lsn_from_hex)
from .utils import (etcd_path,
get_self_dir,
mkdir_if_needed,
subprocess_capture,
lsn_from_hex,
lsn_to_hex)
from fixtures.log_helper import log
"""
This file contains pytest fixtures. A fixture is a test resource that can be
@@ -2065,7 +2070,11 @@ def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Pos
assert (mismatch, error) == ([], [])
def wait_for(number_of_iterations: int, interval: int, func):
def wait_until(number_of_iterations: int, interval: int, func):
"""
Wait until 'func' returns successfully, without exception. Returns the last return value
from the the function.
"""
last_exception = None
for i in range(number_of_iterations):
try:
@@ -2092,9 +2101,15 @@ def remote_consistent_lsn(pageserver_http_client: ZenithPageserverHttpClient,
timeline: uuid.UUID) -> int:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail['remote']['remote_consistent_lsn']
assert isinstance(lsn_str, str)
return lsn_from_hex(lsn_str)
if detail['remote'] is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it it has been uploaded to remote
# storage yet.
return 0
else:
lsn_str = detail['remote']['remote_consistent_lsn']
assert isinstance(lsn_str, str)
return lsn_from_hex(lsn_str)
def wait_for_upload(pageserver_http_client: ZenithPageserverHttpClient,
@@ -2102,8 +2117,15 @@ def wait_for_upload(pageserver_http_client: ZenithPageserverHttpClient,
timeline: uuid.UUID,
lsn: int):
"""waits for local timeline upload up to specified lsn"""
wait_for(10, 1, lambda: remote_consistent_lsn(pageserver_http_client, tenant, timeline) >= lsn)
for i in range(10):
current_lsn = remote_consistent_lsn(pageserver_http_client, tenant, timeline)
if current_lsn >= lsn:
return
log.info("waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn_to_hex(lsn), lsn_to_hex(current_lsn), i + 1))
time.sleep(1)
raise Exception("timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn_to_hex(lsn), lsn_to_hex(current_lsn)))
def last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient,
@@ -2121,5 +2143,12 @@ def wait_for_last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient,
timeline: uuid.UUID,
lsn: int):
"""waits for pageserver to catch up to a certain lsn"""
wait_for(10, 1, lambda: last_record_lsn(pageserver_http_client, tenant, timeline) >= lsn)
for i in range(10):
current_lsn = last_record_lsn(pageserver_http_client, tenant, timeline)
if current_lsn >= lsn:
return
log.info("waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
lsn_to_hex(lsn), lsn_to_hex(current_lsn), i + 1))
time.sleep(1)
raise Exception("timed out while waiting for last_record_lsn to reach {}, was {}".format(
lsn_to_hex(lsn), lsn_to_hex(current_lsn)))