diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 30c0e03d03..64a8846a9d 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -133,6 +133,7 @@ pub(crate) enum DatabaseOperation { InsertTimelineImport, UpdateTimelineImport, DeleteTimelineImport, + ListTimelineImports, } #[must_use] @@ -1640,6 +1641,35 @@ impl Persistence { .await } + pub(crate) async fn list_complete_timeline_imports( + &self, + ) -> DatabaseResult> { + use crate::schema::timeline_imports::dsl; + let persistent = self + .with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| { + Box::pin(async move { + let from_db: Vec = + dsl::timeline_imports.load(conn).await?; + Ok(from_db) + }) + }) + .await?; + + let imports: Result, _> = persistent + .into_iter() + .map(TimelineImport::from_persistent) + .collect(); + match imports { + Ok(ok) => Ok(ok + .into_iter() + .filter(|import| import.is_complete()) + .collect()), + Err(err) => Err(DatabaseError::Logical(format!( + "failed to deserialize import: {err}" + ))), + } + } + pub(crate) async fn delete_timeline_import( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index acd399dca2..7e5e3fd8f4 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -878,6 +878,22 @@ impl Service { }); } + // Fetch the list of completed imports and attempt to finalize them in the background. + // This handles the case where the previous storage controller instance shut down + // whilst finalizing imports. + let complete_imports = self.persistence.list_complete_timeline_imports().await; + match complete_imports { + Ok(ok) => { + tokio::task::spawn({ + let finalize_imports_self = self.clone(); + async move { finalize_imports_self.finalize_timeline_imports(ok).await } + }); + } + Err(err) => { + tracing::error!("Could not retrieve completed imports from database: {err}"); + } + } + tracing::info!( "Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)" ); @@ -3869,13 +3885,10 @@ impl Service { self: &Arc, import: TimelineImport, ) -> anyhow::Result<()> { - // TODO(vlad): On start-up, load up the imports and notify cplane of the - // ones that have been completed. This assumes the new cplane API will - // be idempotent. If that's not possible, bang a flag in the database. - // https://github.com/neondatabase/neon/issues/11570 - tracing::info!("Finalizing timeline import"); + pausable_failpoint!("timeline-import-pre-cplane-notification"); + let import_failed = import.completion_error().is_some(); if !import_failed { @@ -3926,6 +3939,15 @@ impl Service { Ok(()) } + async fn finalize_timeline_imports(self: &Arc, imports: Vec) { + futures::future::join_all( + imports + .into_iter() + .map(|import| self.finalize_timeline_import(import)), + ) + .await; + } + async fn timeline_active_on_all_shards( self: &Arc, import: &TimelineImport, diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index 364299c9f0..b6dd4b252e 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -103,7 +103,7 @@ impl TimelineImport { let crnt = occ.get_mut(); if *crnt == status { Ok(TimelineImportUpdateFollowUp::None) - } else if crnt.is_terminal() && !status.is_terminal() { + } else if crnt.is_terminal() && *crnt != status { Err(TimelineImportUpdateError::UnexpectedUpdate) } else { *crnt = status; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6dbc2f6aa3..48aa739ce4 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3384,6 +3384,9 @@ class VanillaPostgres(PgProtocol): """Return size of pgdatadir subdirectory in bytes.""" return get_dir_size(self.pgdatadir / subdir) + def is_running(self) -> bool: + return self.running + def __enter__(self) -> Self: return self diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 971245f393..a26c3994a5 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -18,7 +18,12 @@ from fixtures.pageserver.http import ( from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import MockS3Server, RemoteStorageKind -from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until +from fixtures.utils import ( + run_only_on_default_postgres, + shared_buffers_for_max_cu, + skip_in_debug_build, + wait_until, +) from mypy_boto3_kms import KMSClient from mypy_boto3_kms.type_defs import EncryptResponseTypeDef from mypy_boto3_s3 import S3Client @@ -43,6 +48,24 @@ smoke_params = [ ] +def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path): + """ + Mock the import S3 bucket into a local directory for a provided vanilla PG instance. + """ + assert not vanilla_pg.is_running() + + path.mkdir() + # what cplane writes before scheduling fast_import + specpath = path / "spec.json" + specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) + # what fast_import writes + vanilla_pg.pgdatadir.rename(path / "pgdata") + statusdir = path / "status" + statusdir.mkdir() + (statusdir / "pgdata").write_text(json.dumps({"done": True})) + (statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True})) + + @skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data") @pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params) def test_pgdata_import_smoke( @@ -155,17 +178,8 @@ def test_pgdata_import_smoke( # TODO: actually exercise fast_import here # TODO: test s3 remote storage # - importbucket = neon_env_builder.repo_dir / "importbucket" - importbucket.mkdir() - # what cplane writes before scheduling fast_import - specpath = importbucket / "spec.json" - specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) - # what fast_import writes - vanilla_pg.pgdatadir.rename(importbucket / "pgdata") - statusdir = importbucket / "status" - statusdir.mkdir() - (statusdir / "pgdata").write_text(json.dumps({"done": True})) - (statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True})) + importbucket_path = neon_env_builder.repo_dir / "importbucket" + mock_import_bucket(vanilla_pg, importbucket_path) # # Do the import @@ -192,7 +206,7 @@ def test_pgdata_import_smoke( "new_timeline_id": str(timeline_id), "import_pgdata": { "idempotency_key": str(idempotency), - "location": {"LocalFs": {"path": str(importbucket.absolute())}}, + "location": {"LocalFs": {"path": str(importbucket_path.absolute())}}, }, }, ) @@ -319,6 +333,87 @@ def test_pgdata_import_smoke( br_initdb_endpoint.safe_psql("select * from othertable") +@run_only_on_default_postgres(reason="PG version is irrelevant here") +def test_import_completion_on_restart( + neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer +): + """ + Validate that the storage controller delivers the import completion notification + eventually even if it was restarted when the import initially completed. + """ + # Set up mock control plane HTTP server to listen for import completions + import_completion_signaled = Event() + + def handler(request: Request) -> Response: + log.info(f"control plane /import_complete request: {request.json}") + import_completion_signaled.set() + return Response(json.dumps({}), status=200) + + cplane_mgmt_api_server = make_httpserver + cplane_mgmt_api_server.expect_request( + "/storage/api/v1/import_complete", method="PUT" + ).respond_with_handler(handler) + + # Plug the cplane mock in + neon_env_builder.control_plane_hooks_api = ( + f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" + ) + + # The import will specifiy a local filesystem path mocking remote storage + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + vanilla_pg.start() + vanilla_pg.stop() + + env = neon_env_builder.init_configs() + env.start() + + importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket" + mock_import_bucket(vanilla_pg, importbucket_path) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + idempotency = ImportPgdataIdemptencyKey.random() + + # Pause before sending the notification + failpoint_name = "timeline-import-pre-cplane-notification" + env.storage_controller.configure_failpoints((failpoint_name, "pause")) + + env.storage_controller.tenant_create(tenant_id) + env.storage_controller.timeline_create( + tenant_id, + { + "new_timeline_id": str(timeline_id), + "import_pgdata": { + "idempotency_key": str(idempotency), + "location": {"LocalFs": {"path": str(importbucket_path.absolute())}}, + }, + }, + ) + + def hit_failpoint(): + log.info("Checking log for pattern...") + try: + assert env.storage_controller.log_contains(f".*at failpoint {failpoint_name}.*") + except Exception: + log.exception("Failed to find pattern in log") + raise + + wait_until(hit_failpoint) + assert not import_completion_signaled.is_set() + + # Restart the storage controller before signalling control plane. + # This clears the failpoint and we expect that the import start-up reconciliation + # kicks in and notifies cplane. + env.storage_controller.stop() + env.storage_controller.start() + + def cplane_notified(): + assert import_completion_signaled.is_set() + + wait_until(cplane_notified) + + def test_fast_import_with_pageserver_ingest( test_output_dir, vanilla_pg: VanillaPostgres,