storage_controller: reconcile completed imports at start-up (#11614)

## Problem

In https://github.com/neondatabase/neon/pull/11345 coordination of
imports moved to the storage controller.
It involves notifying cplane when the import has been completed by
calling an idempotent endpoint.

If the storage controller shuts down in the middle of finalizing an
import, it would never be retried.

## Summary of changes

Reconcile imports at start-up by fetching the complete imports from the
database and spawning a background
task which notifies cplane.

Closes: https://github.com/neondatabase/neon/issues/11570
This commit is contained in:
Vlad Lazar
2025-04-24 19:39:19 +01:00
committed by GitHub
parent 6f7e3c18e4
commit 5ba7315c84
5 changed files with 169 additions and 19 deletions

View File

@@ -133,6 +133,7 @@ pub(crate) enum DatabaseOperation {
InsertTimelineImport,
UpdateTimelineImport,
DeleteTimelineImport,
ListTimelineImports,
}
#[must_use]
@@ -1640,6 +1641,35 @@ impl Persistence {
.await
}
pub(crate) async fn list_complete_timeline_imports(
&self,
) -> DatabaseResult<Vec<TimelineImport>> {
use crate::schema::timeline_imports::dsl;
let persistent = self
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelineImportPersistence> =
dsl::timeline_imports.load(conn).await?;
Ok(from_db)
})
})
.await?;
let imports: Result<Vec<TimelineImport>, _> = persistent
.into_iter()
.map(TimelineImport::from_persistent)
.collect();
match imports {
Ok(ok) => Ok(ok
.into_iter()
.filter(|import| import.is_complete())
.collect()),
Err(err) => Err(DatabaseError::Logical(format!(
"failed to deserialize import: {err}"
))),
}
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,

View File

@@ -878,6 +878,22 @@ impl Service {
});
}
// Fetch the list of completed imports and attempt to finalize them in the background.
// This handles the case where the previous storage controller instance shut down
// whilst finalizing imports.
let complete_imports = self.persistence.list_complete_timeline_imports().await;
match complete_imports {
Ok(ok) => {
tokio::task::spawn({
let finalize_imports_self = self.clone();
async move { finalize_imports_self.finalize_timeline_imports(ok).await }
});
}
Err(err) => {
tracing::error!("Could not retrieve completed imports from database: {err}");
}
}
tracing::info!(
"Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"
);
@@ -3869,13 +3885,10 @@ impl Service {
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
// TODO(vlad): On start-up, load up the imports and notify cplane of the
// ones that have been completed. This assumes the new cplane API will
// be idempotent. If that's not possible, bang a flag in the database.
// https://github.com/neondatabase/neon/issues/11570
tracing::info!("Finalizing timeline import");
pausable_failpoint!("timeline-import-pre-cplane-notification");
let import_failed = import.completion_error().is_some();
if !import_failed {
@@ -3926,6 +3939,15 @@ impl Service {
Ok(())
}
async fn finalize_timeline_imports(self: &Arc<Self>, imports: Vec<TimelineImport>) {
futures::future::join_all(
imports
.into_iter()
.map(|import| self.finalize_timeline_import(import)),
)
.await;
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,

View File

@@ -103,7 +103,7 @@ impl TimelineImport {
let crnt = occ.get_mut();
if *crnt == status {
Ok(TimelineImportUpdateFollowUp::None)
} else if crnt.is_terminal() && !status.is_terminal() {
} else if crnt.is_terminal() && *crnt != status {
Err(TimelineImportUpdateError::UnexpectedUpdate)
} else {
*crnt = status;

View File

@@ -3384,6 +3384,9 @@ class VanillaPostgres(PgProtocol):
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(self.pgdatadir / subdir)
def is_running(self) -> bool:
return self.running
def __enter__(self) -> Self:
return self

View File

@@ -18,7 +18,12 @@ from fixtures.pageserver.http import (
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
from fixtures.utils import (
run_only_on_default_postgres,
shared_buffers_for_max_cu,
skip_in_debug_build,
wait_until,
)
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -43,6 +48,24 @@ smoke_params = [
]
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
"""
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
"""
assert not vanilla_pg.is_running()
path.mkdir()
# what cplane writes before scheduling fast_import
specpath = path / "spec.json"
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
# what fast_import writes
vanilla_pg.pgdatadir.rename(path / "pgdata")
statusdir = path / "status"
statusdir.mkdir()
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
@@ -155,17 +178,8 @@ def test_pgdata_import_smoke(
# TODO: actually exercise fast_import here
# TODO: test s3 remote storage
#
importbucket = neon_env_builder.repo_dir / "importbucket"
importbucket.mkdir()
# what cplane writes before scheduling fast_import
specpath = importbucket / "spec.json"
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
# what fast_import writes
vanilla_pg.pgdatadir.rename(importbucket / "pgdata")
statusdir = importbucket / "status"
statusdir.mkdir()
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
importbucket_path = neon_env_builder.repo_dir / "importbucket"
mock_import_bucket(vanilla_pg, importbucket_path)
#
# Do the import
@@ -192,7 +206,7 @@ def test_pgdata_import_smoke(
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket.absolute())}},
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
@@ -319,6 +333,87 @@ def test_pgdata_import_smoke(
br_initdb_endpoint.safe_psql("select * from othertable")
@run_only_on_default_postgres(reason="PG version is irrelevant here")
def test_import_completion_on_restart(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Validate that the storage controller delivers the import completion notification
eventually even if it was restarted when the import initially completed.
"""
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
# Pause before sending the notification
failpoint_name = "timeline-import-pre-cplane-notification"
env.storage_controller.configure_failpoints((failpoint_name, "pause"))
env.storage_controller.tenant_create(tenant_id)
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def hit_failpoint():
log.info("Checking log for pattern...")
try:
assert env.storage_controller.log_contains(f".*at failpoint {failpoint_name}.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
wait_until(hit_failpoint)
assert not import_completion_signaled.is_set()
# Restart the storage controller before signalling control plane.
# This clears the failpoint and we expect that the import start-up reconciliation
# kicks in and notifies cplane.
env.storage_controller.stop()
env.storage_controller.start()
def cplane_notified():
assert import_completion_signaled.is_set()
wait_until(cplane_notified)
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,