diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 6851f3ca2c..9ab0262407 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1264,8 +1264,24 @@ impl Tenant { "Cannot create timelines on inactive tenant" ); - if self.get_timeline(new_timeline_id, false).is_ok() { + if let Ok(existing) = self.get_timeline(new_timeline_id, false) { debug!("timeline {new_timeline_id} already exists"); + + if let Some(remote_client) = existing.remote_client.as_ref() { + // Wait for uploads to complete, so that when we return Ok, the timeline + // is known to be durable on remote storage. Just like we do at the end of + // this function, after we have created the timeline ourselves. + // + // We only really care that the initial version of `index_part.json` has + // been uploaded. That's enough to remember that the timeline + // exists. However, there is no function to wait specifically for that so + // we just wait for all in-progress uploads to finish. + remote_client + .wait_completion() + .await + .context("wait for timeline uploads to complete")?; + } + return Ok(None); } @@ -1307,6 +1323,17 @@ impl Tenant { } }; + if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { + // Wait for the upload of the 'index_part.json` file to finish, so that when we return + // Ok, the timeline is durable in remote storage. + let kind = ancestor_timeline_id + .map(|_| "branched") + .unwrap_or("bootstrapped"); + remote_client.wait_completion().await.with_context(|| { + format!("wait for {} timeline initial uploads to complete", kind) + })?; + } + Ok(Some(loaded_timeline)) } @@ -2376,17 +2403,18 @@ impl Tenant { src_timeline.initdb_lsn, src_timeline.pg_version, ); - let mut timelines = self.timelines.lock().unwrap(); - let new_timeline = self - .prepare_timeline( + + let new_timeline = { + let mut timelines = self.timelines.lock().unwrap(); + self.prepare_timeline( dst_id, &metadata, timeline_uninit_mark, false, Some(Arc::clone(src_timeline)), )? - .initialize_with_lock(ctx, &mut timelines, true, true)?; - drop(timelines); + .initialize_with_lock(ctx, &mut timelines, true, true)? + }; // Root timeline gets its layers during creation and uploads them along with the metadata. // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created. diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 388e834b56..1ff057fae2 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -272,6 +272,7 @@ class PageserverHttpClient(requests.Session): new_timeline_id: Optional[TimelineId] = None, ancestor_timeline_id: Optional[TimelineId] = None, ancestor_start_lsn: Optional[Lsn] = None, + **kwargs, ) -> Dict[Any, Any]: body: Dict[str, Any] = { "new_timeline_id": str(new_timeline_id) if new_timeline_id else None, @@ -281,7 +282,9 @@ class PageserverHttpClient(requests.Session): if pg_version != PgVersion.NOT_SET: body["pg_version"] = int(pg_version) - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body) + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body, **kwargs + ) self.verbose_error(res) if res.status_code == 409: raise Exception(f"could not create timeline: already exists for id {new_timeline_id}") diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index 31c7ef2b17..e8ec657683 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -136,9 +136,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") # remove the initial tenant - ## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865 assert env.initial_timeline - wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline) pageserver_http.tenant_detach(env.initial_tenant) assert isinstance(env.remote_storage, LocalFsStorage) tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 6de5f7db04..cce9cdc175 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -2,11 +2,12 @@ # env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... import os +import queue import shutil import threading import time from pathlib import Path -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple import pytest from fixtures.log_helper import log @@ -26,6 +27,7 @@ from fixtures.pageserver.utils import ( ) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import print_gc_result, query_scalar, wait_until +from requests import ReadTimeout # @@ -626,10 +628,7 @@ def test_empty_branch_remote_storage_upload( new_branch_name = "new_branch" new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant) - - with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint: - wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id) - wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id) + assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id) timelines_before_detach = set( map( @@ -658,13 +657,19 @@ def test_empty_branch_remote_storage_upload( ), f"Expected to have same timelines after reattach, but got {timelines_after_detach}" -# Branches off a root branch, but does not write anything to the new branch, so it has a metadata file only. -# Ensures the branch is not on the remote storage and restarts the pageserver — the branch should be uploaded after the restart. @pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) def test_empty_branch_remote_storage_upload_on_restart( neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, ): + """ + Branches off a root branch, but does not write anything to the new branch, so + it has a metadata file only. + + Ensures the branch is not on the remote storage and restarts the pageserver + — the upload should be scheduled by load, and create_timeline should await + for it even though it gets 409 Conflict. + """ neon_env_builder.enable_remote_storage( remote_storage_kind=remote_storage_kind, test_name="test_empty_branch_remote_storage_upload_on_restart", @@ -673,35 +678,87 @@ def test_empty_branch_remote_storage_upload_on_restart( env = neon_env_builder.init_start() client = env.pageserver.http_client() - new_branch_name = "new_branch" - new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant) + client.configure_failpoints(("before-upload-index", "return")) - with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint: - wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id) - wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id) + new_branch_timeline_id = TimelineId.generate() + with pytest.raises(ReadTimeout): + client.timeline_create( + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline, + new_timeline_id=new_branch_timeline_id, + pg_version=env.pg_version, + timeout=4, + ) + + env.pageserver.allowed_errors.append( + f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing" + ) + + # index upload is now hitting the failpoint, should not block the shutdown env.pageserver.stop() - # Remove new branch from the remote storage - assert isinstance(env.remote_storage, LocalFsStorage) - new_branch_on_remote_storage = ( - env.remote_storage.root - / "tenants" - / str(env.initial_tenant) - / "timelines" - / str(new_branch_timeline_id) + timeline_path = ( + Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id) ) - assert ( - new_branch_on_remote_storage.is_dir() - ), f"'{new_branch_on_remote_storage}' path does not exist on the remote storage" - shutil.rmtree(new_branch_on_remote_storage) - env.pageserver.start() + local_metadata = env.repo_dir / timeline_path / "metadata" + assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload" - wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id) + assert isinstance(env.remote_storage, LocalFsStorage) + new_branch_on_remote_storage = env.remote_storage.root / timeline_path assert ( - new_branch_on_remote_storage.is_dir() - ), f"New branch should have been reuploaded on pageserver restart to the remote storage path '{new_branch_on_remote_storage}'" + not new_branch_on_remote_storage.exists() + ), "failpoint should had prohibited index_part.json upload" + + # during reconciliation we should had scheduled the uploads and on the + # retried create_timeline, we will await for those to complete on next + # client.timeline_create + env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"}) + + # sleep a bit to force the upload task go into exponential backoff + time.sleep(1) + + q: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + barrier = threading.Barrier(2) + + def create_in_background(): + barrier.wait() + try: + client.timeline_create( + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline, + new_timeline_id=new_branch_timeline_id, + pg_version=env.pg_version, + ) + q.put(None) + except PageserverApiException as e: + q.put(e) + + create_thread = threading.Thread(target=create_in_background) + create_thread.start() + + try: + # maximize chances of actually waiting for the uploads by create_timeline + barrier.wait() + + assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading" + + client.configure_failpoints(("before-upload-index", "off")) + conflict = q.get() + + assert conflict, "create_timeline should not have succeeded" + assert ( + conflict.status_code == 409 + ), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict" + + assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id) + + assert ( + new_branch_on_remote_storage / "index_part.json" + ).is_file(), "uploads scheduled during initial load should had been awaited for" + finally: + create_thread.join() def wait_upload_queue_empty( @@ -752,4 +809,17 @@ def get_queued_count( return int(val) +def assert_nothing_to_upload( + client: PageserverHttpClient, + tenant_id: TenantId, + timeline_id: TimelineId, +): + """ + Check last_record_lsn == remote_consistent_lsn. Assert works only for empty timelines, which + do not have anything to compact or gc. + """ + detail = client.timeline_detail(tenant_id, timeline_id) + assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"]) + + # TODO Test that we correctly handle GC of files that are stuck in upload queue. diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 81ad256f4b..2a5b30803b 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -23,7 +23,6 @@ from fixtures.pageserver.utils import ( tenant_exists, wait_for_last_record_lsn, wait_for_upload, - wait_for_upload_queue_empty, ) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import ( @@ -546,7 +545,7 @@ def test_emergency_relocate_with_branches_slow_replay( # - A logical replication message between the inserts, so that we can conveniently # pause the WAL ingestion between the two inserts. # - Child branch, created after the inserts - tenant_id, timeline_id = env.neon_cli.create_tenant() + tenant_id, _ = env.neon_cli.create_tenant() main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) with main_endpoint.cursor() as cur: @@ -559,14 +558,7 @@ def test_emergency_relocate_with_branches_slow_replay( current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) main_endpoint.stop() - child_timeline_id = env.neon_cli.create_branch( - "child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn - ) - - # Wait for the index_part.json file of both branches to be uploaded to remote storage. - # This is a work around for issue https://github.com/neondatabase/neon/issues/3865. - wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id) - wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id) + env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn) # Now kill the pageserver, remove the tenant directory, and restart. This simulates # the scenario that a pageserver dies unexpectedly and cannot be recovered, so we relocate @@ -704,7 +696,7 @@ def test_emergency_relocate_with_branches_createdb( pageserver_http = env.pageserver.http_client() # create new nenant - tenant_id, timeline_id = env.neon_cli.create_tenant() + tenant_id, _ = env.neon_cli.create_tenant() main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) with main_endpoint.cursor() as cur: @@ -712,9 +704,7 @@ def test_emergency_relocate_with_branches_createdb( cur.execute("CREATE DATABASE neondb") current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) - child_timeline_id = env.neon_cli.create_branch( - "child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn - ) + env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn) with main_endpoint.cursor(dbname="neondb") as cur: cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,100)") @@ -725,11 +715,6 @@ def test_emergency_relocate_with_branches_createdb( cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,200)") child_endpoint.stop() - # Wait for the index_part.json file of both branches to be uploaded to remote storage. - # This is a work around for issue https://github.com/neondatabase/neon/issues/3865. - wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id) - wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id) - # Kill the pageserver, remove the tenant directory, and restart env.pageserver.stop(immediate=True) shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))