mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
test: fix test_concurrent_timeline_delete_if_first_stuck_at_index_upload
manually cherry-picked 245a2a0592
This commit is contained in:
@@ -346,20 +346,31 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
|
||||
def first_call(result_queue):
|
||||
def delete_timeline_call(name, result_queue, barrier):
|
||||
if barrier:
|
||||
barrier.wait()
|
||||
try:
|
||||
log.info("first call start")
|
||||
log.info(f"{name} call start")
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
|
||||
log.info("first call success")
|
||||
log.info(f"{name} call success")
|
||||
result_queue.put("success")
|
||||
except Exception:
|
||||
log.exception("first call failed")
|
||||
log.exception(f"{name} call failed")
|
||||
result_queue.put("failure, see log for stack trace")
|
||||
|
||||
first_call_result: queue.Queue[str] = queue.Queue()
|
||||
first_call_thread = threading.Thread(target=first_call, args=(first_call_result,))
|
||||
delete_results: queue.Queue[str] = queue.Queue()
|
||||
first_call_thread = threading.Thread(
|
||||
target=delete_timeline_call,
|
||||
args=(
|
||||
"1st",
|
||||
delete_results,
|
||||
None,
|
||||
),
|
||||
)
|
||||
first_call_thread.start()
|
||||
|
||||
second_call_thread = None
|
||||
|
||||
try:
|
||||
|
||||
def first_call_hit_failpoint():
|
||||
@@ -369,30 +380,41 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
|
||||
wait_until(50, 0.1, first_call_hit_failpoint)
|
||||
|
||||
# make the second call and assert behavior
|
||||
log.info("second call start")
|
||||
error_msg_re = "another task is already setting the deleted_flag, started at"
|
||||
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
|
||||
assert second_call_err.value.status_code == 500
|
||||
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
|
||||
# the second call will try to transition the timeline into Stopping state as well
|
||||
barrier = threading.Barrier(2)
|
||||
second_call_thread = threading.Thread(
|
||||
target=delete_timeline_call,
|
||||
args=(
|
||||
"2nd",
|
||||
delete_results,
|
||||
barrier,
|
||||
),
|
||||
)
|
||||
second_call_thread.start()
|
||||
|
||||
barrier.wait()
|
||||
|
||||
# release the pause
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
# both should had succeeded
|
||||
result = delete_results.get()
|
||||
assert result == "success"
|
||||
result = delete_results.get()
|
||||
assert result == "success"
|
||||
|
||||
# the second call will try to transition the timeline into Stopping state, but it's already in that state
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
log.info("second call failed as expected")
|
||||
|
||||
# by now we know that the second call failed, let's ensure the first call will finish
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
result = first_call_result.get()
|
||||
assert result == "success"
|
||||
|
||||
finally:
|
||||
log.info("joining first call thread")
|
||||
log.info("joining 1st thread")
|
||||
# in any case, make sure the lifetime of the thread is bounded to this test
|
||||
first_call_thread.join()
|
||||
|
||||
if second_call_thread:
|
||||
log.info("joining 2nd thread")
|
||||
second_call_thread.join()
|
||||
|
||||
|
||||
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user