From 5ba7315c8484f2f84ca33bd31c7346a1db08b725 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 24 Apr 2025 19:39:19 +0100 Subject: [PATCH] storage_controller: reconcile completed imports at start-up (#11614) ## Problem In https://github.com/neondatabase/neon/pull/11345 coordination of imports moved to the storage controller. It involves notifying cplane when the import has been completed by calling an idempotent endpoint. If the storage controller shuts down in the middle of finalizing an import, it would never be retried. ## Summary of changes Reconcile imports at start-up by fetching the complete imports from the database and spawning a background task which notifies cplane. Closes: https://github.com/neondatabase/neon/issues/11570 --- storage_controller/src/persistence.rs | 30 ++++++ storage_controller/src/service.rs | 32 +++++- storage_controller/src/timeline_import.rs | 2 +- test_runner/fixtures/neon_fixtures.py | 3 + test_runner/regress/test_import_pgdata.py | 121 +++++++++++++++++++--- 5 files changed, 169 insertions(+), 19 deletions(-) diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 30c0e03d03..64a8846a9d 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -133,6 +133,7 @@ pub(crate) enum DatabaseOperation { InsertTimelineImport, UpdateTimelineImport, DeleteTimelineImport, + ListTimelineImports, } #[must_use] @@ -1640,6 +1641,35 @@ impl Persistence { .await } + pub(crate) async fn list_complete_timeline_imports( + &self, + ) -> DatabaseResult> { + use crate::schema::timeline_imports::dsl; + let persistent = self + .with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| { + Box::pin(async move { + let from_db: Vec = + dsl::timeline_imports.load(conn).await?; + Ok(from_db) + }) + }) + .await?; + + let imports: Result, _> = persistent + .into_iter() + .map(TimelineImport::from_persistent) + .collect(); + match imports { + Ok(ok) => Ok(ok + .into_iter() + .filter(|import| import.is_complete()) + .collect()), + Err(err) => Err(DatabaseError::Logical(format!( + "failed to deserialize import: {err}" + ))), + } + } + pub(crate) async fn delete_timeline_import( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index acd399dca2..7e5e3fd8f4 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -878,6 +878,22 @@ impl Service { }); } + // Fetch the list of completed imports and attempt to finalize them in the background. + // This handles the case where the previous storage controller instance shut down + // whilst finalizing imports. + let complete_imports = self.persistence.list_complete_timeline_imports().await; + match complete_imports { + Ok(ok) => { + tokio::task::spawn({ + let finalize_imports_self = self.clone(); + async move { finalize_imports_self.finalize_timeline_imports(ok).await } + }); + } + Err(err) => { + tracing::error!("Could not retrieve completed imports from database: {err}"); + } + } + tracing::info!( "Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)" ); @@ -3869,13 +3885,10 @@ impl Service { self: &Arc, import: TimelineImport, ) -> anyhow::Result<()> { - // TODO(vlad): On start-up, load up the imports and notify cplane of the - // ones that have been completed. This assumes the new cplane API will - // be idempotent. If that's not possible, bang a flag in the database. - // https://github.com/neondatabase/neon/issues/11570 - tracing::info!("Finalizing timeline import"); + pausable_failpoint!("timeline-import-pre-cplane-notification"); + let import_failed = import.completion_error().is_some(); if !import_failed { @@ -3926,6 +3939,15 @@ impl Service { Ok(()) } + async fn finalize_timeline_imports(self: &Arc, imports: Vec) { + futures::future::join_all( + imports + .into_iter() + .map(|import| self.finalize_timeline_import(import)), + ) + .await; + } + async fn timeline_active_on_all_shards( self: &Arc, import: &TimelineImport, diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index 364299c9f0..b6dd4b252e 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -103,7 +103,7 @@ impl TimelineImport { let crnt = occ.get_mut(); if *crnt == status { Ok(TimelineImportUpdateFollowUp::None) - } else if crnt.is_terminal() && !status.is_terminal() { + } else if crnt.is_terminal() && *crnt != status { Err(TimelineImportUpdateError::UnexpectedUpdate) } else { *crnt = status; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6dbc2f6aa3..48aa739ce4 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3384,6 +3384,9 @@ class VanillaPostgres(PgProtocol): """Return size of pgdatadir subdirectory in bytes.""" return get_dir_size(self.pgdatadir / subdir) + def is_running(self) -> bool: + return self.running + def __enter__(self) -> Self: return self diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 971245f393..a26c3994a5 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -18,7 +18,12 @@ from fixtures.pageserver.http import ( from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import MockS3Server, RemoteStorageKind -from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until +from fixtures.utils import ( + run_only_on_default_postgres, + shared_buffers_for_max_cu, + skip_in_debug_build, + wait_until, +) from mypy_boto3_kms import KMSClient from mypy_boto3_kms.type_defs import EncryptResponseTypeDef from mypy_boto3_s3 import S3Client @@ -43,6 +48,24 @@ smoke_params = [ ] +def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path): + """ + Mock the import S3 bucket into a local directory for a provided vanilla PG instance. + """ + assert not vanilla_pg.is_running() + + path.mkdir() + # what cplane writes before scheduling fast_import + specpath = path / "spec.json" + specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) + # what fast_import writes + vanilla_pg.pgdatadir.rename(path / "pgdata") + statusdir = path / "status" + statusdir.mkdir() + (statusdir / "pgdata").write_text(json.dumps({"done": True})) + (statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True})) + + @skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data") @pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params) def test_pgdata_import_smoke( @@ -155,17 +178,8 @@ def test_pgdata_import_smoke( # TODO: actually exercise fast_import here # TODO: test s3 remote storage # - importbucket = neon_env_builder.repo_dir / "importbucket" - importbucket.mkdir() - # what cplane writes before scheduling fast_import - specpath = importbucket / "spec.json" - specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) - # what fast_import writes - vanilla_pg.pgdatadir.rename(importbucket / "pgdata") - statusdir = importbucket / "status" - statusdir.mkdir() - (statusdir / "pgdata").write_text(json.dumps({"done": True})) - (statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True})) + importbucket_path = neon_env_builder.repo_dir / "importbucket" + mock_import_bucket(vanilla_pg, importbucket_path) # # Do the import @@ -192,7 +206,7 @@ def test_pgdata_import_smoke( "new_timeline_id": str(timeline_id), "import_pgdata": { "idempotency_key": str(idempotency), - "location": {"LocalFs": {"path": str(importbucket.absolute())}}, + "location": {"LocalFs": {"path": str(importbucket_path.absolute())}}, }, }, ) @@ -319,6 +333,87 @@ def test_pgdata_import_smoke( br_initdb_endpoint.safe_psql("select * from othertable") +@run_only_on_default_postgres(reason="PG version is irrelevant here") +def test_import_completion_on_restart( + neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer +): + """ + Validate that the storage controller delivers the import completion notification + eventually even if it was restarted when the import initially completed. + """ + # Set up mock control plane HTTP server to listen for import completions + import_completion_signaled = Event() + + def handler(request: Request) -> Response: + log.info(f"control plane /import_complete request: {request.json}") + import_completion_signaled.set() + return Response(json.dumps({}), status=200) + + cplane_mgmt_api_server = make_httpserver + cplane_mgmt_api_server.expect_request( + "/storage/api/v1/import_complete", method="PUT" + ).respond_with_handler(handler) + + # Plug the cplane mock in + neon_env_builder.control_plane_hooks_api = ( + f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" + ) + + # The import will specifiy a local filesystem path mocking remote storage + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + vanilla_pg.start() + vanilla_pg.stop() + + env = neon_env_builder.init_configs() + env.start() + + importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket" + mock_import_bucket(vanilla_pg, importbucket_path) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + idempotency = ImportPgdataIdemptencyKey.random() + + # Pause before sending the notification + failpoint_name = "timeline-import-pre-cplane-notification" + env.storage_controller.configure_failpoints((failpoint_name, "pause")) + + env.storage_controller.tenant_create(tenant_id) + env.storage_controller.timeline_create( + tenant_id, + { + "new_timeline_id": str(timeline_id), + "import_pgdata": { + "idempotency_key": str(idempotency), + "location": {"LocalFs": {"path": str(importbucket_path.absolute())}}, + }, + }, + ) + + def hit_failpoint(): + log.info("Checking log for pattern...") + try: + assert env.storage_controller.log_contains(f".*at failpoint {failpoint_name}.*") + except Exception: + log.exception("Failed to find pattern in log") + raise + + wait_until(hit_failpoint) + assert not import_completion_signaled.is_set() + + # Restart the storage controller before signalling control plane. + # This clears the failpoint and we expect that the import start-up reconciliation + # kicks in and notifies cplane. + env.storage_controller.stop() + env.storage_controller.start() + + def cplane_notified(): + assert import_completion_signaled.is_set() + + wait_until(cplane_notified) + + def test_fast_import_with_pageserver_ingest( test_output_dir, vanilla_pg: VanillaPostgres,