safekeeper: streaming pull_timeline

- Add /snapshot http endpoing streaming tar archive timeline contents up to
  flush_lsn.
- Add check that term doesn't change, corresponding test passes now.
- Also prepares infra to hold off WAL removal during the basebackup.
- Sprinkle fsyncs to persist the pull_timeline result.

ref https://github.com/neondatabase/neon/issues/6340
This commit is contained in:
Arseny Sher
2024-06-03 15:47:21 +03:00
committed by Arseny Sher
parent ed9ffb9af2
commit d8b2a49c55
9 changed files with 367 additions and 195 deletions

View File

@@ -317,9 +317,9 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
time.sleep(1)
# Ensure that safekeepers don't lose remote_consistent_lsn on restart.
# Control file is persisted each 5s. TODO: do that on shutdown and remove sleep.
time.sleep(6)
for sk in env.safekeepers:
# force persist cfile
sk.http_client().checkpoint(tenant_id, timeline_id)
sk.stop()
sk.start()
stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
@@ -1749,11 +1749,11 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_pull_timeline")
timeline_id = env.initial_timeline
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
endpoint = env.endpoints.create("test_pull_timeline")
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
@@ -1787,7 +1787,7 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Restarting compute with new config to verify that it works")
endpoint.stop_and_destroy().create("test_pull_timeline")
endpoint.stop_and_destroy().create("main")
endpoint.active_safekeepers = [1, 3, 4]
endpoint.start()
@@ -1836,14 +1836,14 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
dst_http = dst_sk.http_client()
src_http = src_sk.http_client()
# run pull_timeline which will halt before downloading files
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
pt_handle.start()
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
# ensure segment exists
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
@@ -1854,7 +1854,7 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
pt_handle.join()
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
@@ -1883,7 +1883,6 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
# enough, so it won't be affected by term change anymore.
#
# Expected to fail while term check is not implemented.
@pytest.mark.xfail
def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
@@ -1900,14 +1899,14 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
dst_http = dst_sk.http_client()
src_http = src_sk.http_client()
# run pull_timeline which will halt before downloading files
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
pt_handle.start()
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
src_http = src_sk.http_client()
term_before = src_http.timeline_status(tenant_id, timeline_id).term
@@ -1922,7 +1921,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
term_after = src_http.timeline_status(tenant_id, timeline_id).term
assert term_after > term_before, f"term_after={term_after}, term_before={term_before}"
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
with pytest.raises(requests.exceptions.HTTPError):
pt_handle.join()