From d0aaec2abbf502a962351b5939f1fae974053cd5 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 9 May 2025 11:55:26 +0100 Subject: [PATCH] storage_controller: create imported timelines on safekeepers (#11801) ## Problem SK timeline creations were skipped for imported timelines since we didn't know the correct start LSN of the timeline at that point. ## Summary of changes Created imported timelines on the SK as part of the import finalize step. We use the last record LSN of shard 0 as the start LSN for the safekeeper timeline. Closes https://github.com/neondatabase/neon/issues/11569 --- storage_controller/src/service.rs | 51 ++++++++++++++----- .../src/service/safekeeper_service.rs | 36 +++++++++++++ test_runner/regress/test_import_pgdata.py | 33 ++++++------ 3 files changed, 90 insertions(+), 30 deletions(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index fdb791c2cf..193050460d 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3886,10 +3886,10 @@ impl Service { None } else if safekeepers { - // Note that we do not support creating the timeline on the safekeepers - // for imported timelines. The `start_lsn` of the timeline is not known - // until the import finshes. - // https://github.com/neondatabase/neon/issues/11569 + // Note that for imported timelines, we do not create the timeline on the safekeepers + // straight away. Instead, we do it once the import finalized such that we know what + // start LSN to provide for the safekeepers. This is done in + // [`Self::finalize_timeline_import`]. let res = self .tenant_timeline_create_safekeepers(tenant_id, &timeline_info) .instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id)) @@ -3966,11 +3966,22 @@ impl Service { let active = self.timeline_active_on_all_shards(&import).await?; match active { - true => { + Some(timeline_info) => { tracing::info!("Timeline became active on all shards"); + + if self.config.timelines_onto_safekeepers { + // Now that we know the start LSN of this timeline, create it on the + // safekeepers. + self.tenant_timeline_create_safekeepers_until_success( + import.tenant_id, + timeline_info, + ) + .await?; + } + break; } - false => { + None => { tracing::info!("Timeline not active on all shards yet"); tokio::select! { @@ -4004,9 +4015,6 @@ impl Service { .range_mut(TenantShardId::tenant_range(import.tenant_id)) .for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle); - // TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn, - // so we can't create the timeline on the safekeepers. Fix by moving creation here. - // https://github.com/neondatabase/neon/issues/11569 tracing::info!(%import_failed, "Timeline import complete"); Ok(()) @@ -4021,10 +4029,16 @@ impl Service { .await; } + /// If the timeline is active on all shards, returns the [`TimelineInfo`] + /// collected from shard 0. + /// + /// An error is returned if the shard layout has changed during the import. + /// This is guarded against within the storage controller and the pageserver, + /// and, therefore, unexpected. async fn timeline_active_on_all_shards( self: &Arc, import: &TimelineImport, - ) -> anyhow::Result { + ) -> anyhow::Result> { let targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -4048,13 +4062,17 @@ impl Service { .expect("Pageservers may not be deleted while referenced"); targets.push((*tenant_shard_id, node.clone())); } else { - return Ok(false); + return Ok(None); } } targets }; + if targets.is_empty() { + anyhow::bail!("No shards found to finalize import for"); + } + let results = self .tenant_for_shards_api( targets, @@ -4070,10 +4088,17 @@ impl Service { ) .await; - Ok(results.into_iter().all(|res| match res { + let all_active = results.iter().all(|res| match res { Ok(info) => info.state == TimelineState::Active, Err(_) => false, - })) + }); + + if all_active { + // Both unwraps are validated above + Ok(Some(results.into_iter().next().unwrap().unwrap())) + } else { + Ok(None) + } } pub(crate) async fn tenant_timeline_archival_config( diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 5eecf0d415..5c15660ba3 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -323,6 +323,42 @@ impl Service { }) } + pub(crate) async fn tenant_timeline_create_safekeepers_until_success( + self: &Arc, + tenant_id: TenantId, + timeline_info: TimelineInfo, + ) -> anyhow::Result<()> { + const BACKOFF: Duration = Duration::from_secs(5); + + loop { + if self.cancel.is_cancelled() { + anyhow::bail!("Shut down requested while finalizing import"); + } + + let res = self + .tenant_timeline_create_safekeepers(tenant_id, &timeline_info) + .await; + + match res { + Ok(_) => { + tracing::info!("Timeline created on safekeepers"); + break; + } + Err(err) => { + tracing::error!("Failed to create timeline on safekeepers: {err}"); + tokio::select! { + _ = self.cancel.cancelled() => { + anyhow::bail!("Shut down requested while finalizing import"); + }, + _ = tokio::time::sleep(BACKOFF) => {} + }; + } + } + } + + Ok(()) + } + /// Directly insert the timeline into the database without reconciling it with safekeepers. /// /// Useful if the timeline already exists on the specified safekeepers, diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 2fda1991f7..05e63ad955 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -24,6 +24,7 @@ from fixtures.utils import ( skip_in_debug_build, wait_until, ) +from fixtures.workload import Workload from mypy_boto3_kms import KMSClient from mypy_boto3_kms.type_defs import EncryptResponseTypeDef from mypy_boto3_s3 import S3Client @@ -97,6 +98,10 @@ def test_pgdata_import_smoke( f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" ) + if neon_env_builder.storage_controller_config is None: + neon_env_builder.storage_controller_config = {} + neon_env_builder.storage_controller_config["timelines_onto_safekeepers"] = True + env = neon_env_builder.init_start() # The test needs LocalFs support, which is only built in testing mode. @@ -286,34 +291,28 @@ def test_pgdata_import_smoke( # # validate that we can write # - rw_endpoint = env.endpoints.create_start( - branch_name=import_branch_name, - endpoint_id="rw", - tenant_id=tenant_id, - config_lines=ep_config, - ) - rw_endpoint.safe_psql("create table othertable(values text)") - rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()")) + workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name) + workload.init() + workload.write_rows(64) + workload.validate() - # TODO: consider using `class Workload` here - # to do compaction and whatnot? + rw_lsn = Lsn(workload.endpoint().safe_psql_scalar("select pg_current_wal_flush_lsn()")) # # validate that we can branch (important use case) # # ... at the tip - _ = env.create_branch( + child_timeline_id = env.create_branch( new_branch_name="br-tip", ancestor_branch_name=import_branch_name, tenant_id=tenant_id, ancestor_start_lsn=rw_lsn, ) - br_tip_endpoint = env.endpoints.create_start( - branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id, config_lines=ep_config - ) - validate_vanilla_equivalence(br_tip_endpoint) - br_tip_endpoint.safe_psql("select * from othertable") + child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip") + child_workload.validate() + + validate_vanilla_equivalence(child_workload.endpoint()) # ... at the initdb lsn _ = env.create_branch( @@ -330,7 +329,7 @@ def test_pgdata_import_smoke( ) validate_vanilla_equivalence(br_initdb_endpoint) with pytest.raises(psycopg2.errors.UndefinedTable): - br_initdb_endpoint.safe_psql("select * from othertable") + br_initdb_endpoint.safe_psql(f"select * from {workload.table}") @run_only_on_default_postgres(reason="PG version is irrelevant here")