From 7b2f9dc9080821985525fd81fd33e10967062fb1 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 3 Oct 2022 13:33:55 +0300 Subject: [PATCH] Reuse existing tenants during attach (#2540) --- pageserver/src/storage_sync.rs | 1 + pageserver/src/tenant.rs | 46 ++++----- pageserver/src/tenant_mgr.rs | 27 +++--- .../test_tenants_with_remote_storage.py | 96 +++++++++++++++++++ 4 files changed, 136 insertions(+), 34 deletions(-) diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 776d9214d4..bee460d173 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task( (storage, remote_index_clone, sync_queue), max_sync_errors, ) + .instrument(info_span!("storage_sync_loop")) .await; Ok(()) }, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c9ad3bf232..672ee3a488 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -400,16 +400,19 @@ impl Tenant { timeline_id, metadata.pg_version() ); - let timeline = self - .initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor) - .with_context(|| format!("Failed to initialize timeline {timeline_id}"))?; - - match timelines_accessor.entry(timeline.timeline_id) { - Entry::Occupied(_) => bail!( - "Found freshly initialized timeline {} in the tenant map", - timeline.timeline_id + let ancestor = metadata + .ancestor_timeline() + .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) + .cloned(); + match timelines_accessor.entry(timeline_id) { + Entry::Occupied(_) => warn!( + "Timeline {}/{} already exists in the tenant map, skipping its initialization", + self.tenant_id, timeline_id ), Entry::Vacant(v) => { + let timeline = self + .initialize_new_timeline(timeline_id, metadata, ancestor) + .with_context(|| format!("Failed to initialize timeline {timeline_id}"))?; v.insert(timeline); } } @@ -609,21 +612,14 @@ impl Tenant { &self, new_timeline_id: TimelineId, new_metadata: TimelineMetadata, - timelines: &mut MutexGuard>>, + ancestor: Option>, ) -> anyhow::Result> { - let ancestor = match new_metadata.ancestor_timeline() { - Some(ancestor_timeline_id) => Some( - timelines - .get(&ancestor_timeline_id) - .cloned() - .with_context(|| { - format!( - "Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found" - ) - })?, - ), - None => None, - }; + if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() { + anyhow::ensure!( + ancestor.is_some(), + "Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found" + ) + } let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn(); let pg_version = new_metadata.pg_version(); @@ -1080,8 +1076,12 @@ impl Tenant { ) })?; + let ancestor = new_metadata + .ancestor_timeline() + .and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id)) + .cloned(); let new_timeline = self - .initialize_new_timeline(new_timeline_id, new_metadata, timelines) + .initialize_new_timeline(new_timeline_id, new_metadata, ancestor) .with_context(|| { format!( "Failed to initialize timeline {}/{}", diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index fcb2c18b79..1efd3d4af4 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -107,6 +107,9 @@ pub fn init_tenant_mgr( /// Ignores other timelines that might be present for tenant, but were not passed as a parameter. /// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken", /// and the load continues. +/// +/// Attach happens on startup and sucessful timeline downloads +/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered). pub fn attach_local_tenants( conf: &'static PageServerConf, remote_index: &RemoteIndex, @@ -122,18 +125,20 @@ pub fn attach_local_tenants( ); debug!("Timelines to attach: {local_timelines:?}"); - let tenant = load_local_tenant(conf, tenant_id, remote_index); - { - match tenants_state::write_tenants().entry(tenant_id) { - hash_map::Entry::Occupied(_) => { - error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state"); - continue; - } - hash_map::Entry::Vacant(v) => { - v.insert(Arc::clone(&tenant)); - } + let mut tenants_accessor = tenants_state::write_tenants(); + let tenant = match tenants_accessor.entry(tenant_id) { + hash_map::Entry::Occupied(o) => { + info!("Tenant {tenant_id} was found in pageserver's memory"); + Arc::clone(o.get()) } - } + hash_map::Entry::Vacant(v) => { + info!("Tenant {tenant_id} was not found in pageserver's memory, loading it"); + let tenant = load_local_tenant(conf, tenant_id, remote_index); + v.insert(Arc::clone(&tenant)); + tenant + } + }; + drop(tenants_accessor); if tenant.current_state() == TenantState::Broken { warn!("Skipping timeline load for broken tenant {tenant_id}") diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 83affac062..d8424e22c8 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -7,19 +7,25 @@ # import asyncio +import os +from pathlib import Path from typing import List, Tuple import pytest +from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + NeonPageserverHttpClient, Postgres, RemoteStorageKind, available_remote_storages, wait_for_last_record_lsn, wait_for_upload, + wait_until, ) from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import query_scalar async def tenant_workload(env: NeonEnv, pg: Postgres): @@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem # run final checkpoint manually to flush all the data to remote storage pageserver_http.timeline_checkpoint(tenant_id, timeline_id) wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) + + +@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) +def test_tenants_attached_after_download( + neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="remote_storage_kind", + ) + + data_id = 1 + data_secret = "very secret secret" + + ##### First start, insert secret data and upload it to the remote storage + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + pg = env.postgres.create_start("main") + + client = env.pageserver.http_client() + + tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) + + for checkpoint_number in range(1, 3): + with pg.cursor() as cur: + cur.execute( + f""" + CREATE TABLE t{checkpoint_number}(id int primary key, secret text); + INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}'); + """ + ) + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + # wait until pageserver receives that data + wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn) + + # run checkpoint manually to be sure that data landed in remote storage + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + log.info(f"waiting for checkpoint {checkpoint_number} upload") + # wait until pageserver successfully uploaded a checkpoint to remote storage + wait_for_upload(client, tenant_id, timeline_id, current_lsn) + log.info(f"upload of checkpoint {checkpoint_number} is done") + + ##### Stop the pageserver, erase its layer file to force it being downloaded from S3 + env.postgres.stop_all() + env.pageserver.stop() + + timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) + local_layer_deleted = False + for path in Path.iterdir(timeline_dir): + if path.name.startswith("00000"): + # Looks like a layer file. Remove it + os.remove(path) + local_layer_deleted = True + break + assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}" + + ##### Start the pageserver, forcing it to download the layer file and load the timeline into memory + env.pageserver.start() + client = env.pageserver.http_client() + + wait_until( + number_of_iterations=5, + interval=1, + func=lambda: expect_tenant_to_download_timeline(client, tenant_id), + ) + + restored_timelines = client.timeline_list(tenant_id) + assert ( + len(restored_timelines) == 1 + ), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage" + retored_timeline = restored_timelines[0] + assert retored_timeline["timeline_id"] == str( + timeline_id + ), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage" + + +def expect_tenant_to_download_timeline( + client: NeonPageserverHttpClient, + tenant_id: TenantId, +): + for tenant in client.tenant_list(): + if tenant["id"] == str(tenant_id): + assert not tenant.get( + "has_in_progress_downloads", True + ), f"Tenant {tenant_id} should have no downloads in progress" + return + assert False, f"Tenant {tenant_id} is missing on pageserver"