diff --git a/Cargo.lock b/Cargo.lock
index d9ac167042..420def152d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1274,6 +1274,7 @@ dependencies = [
"chrono",
"clap",
"compute_api",
+ "fail",
"flate2",
"futures",
"hyper 0.14.30",
diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml
index c0c390caef..9525b27818 100644
--- a/compute_tools/Cargo.toml
+++ b/compute_tools/Cargo.toml
@@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
-testing = []
+testing = ["fail/failpoints"]
[dependencies]
base64.workspace = true
@@ -19,6 +19,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
+fail.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs
index bb248734a8..95ade9a87d 100644
--- a/compute_tools/src/bin/compute_ctl.rs
+++ b/compute_tools/src/bin/compute_ctl.rs
@@ -67,12 +67,15 @@ use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
+use utils::failpoint_support;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
+ let scenario = failpoint_support::init();
+
let (build_tag, clap_args) = init()?;
// enable core dumping for all child processes
@@ -100,6 +103,8 @@ fn main() -> Result<()> {
maybe_delay_exit(delay_exit);
+ scenario.teardown();
+
deinit_and_exit(wait_pg_result);
}
diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs
index d72a04f2f9..78f6033429 100644
--- a/compute_tools/src/compute.rs
+++ b/compute_tools/src/compute.rs
@@ -1181,8 +1181,19 @@ impl ComputeNode {
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");
- let mut client = conf.connect(NoTls)?;
- handle_migrations(&mut client).context("apply_config handle_migrations")
+ match conf.connect(NoTls) {
+ Ok(mut client) => {
+ if let Err(e) = handle_migrations(&mut client) {
+ error!("Failed to run migrations: {}", e);
+ }
+ }
+ Err(e) => {
+ error!(
+ "Failed to connect to the compute for running migrations: {}",
+ e
+ );
+ }
+ };
});
Ok::<(), anyhow::Error>(())
diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs
index 7fa6426d8f..a4b1a63e6d 100644
--- a/compute_tools/src/http/api.rs
+++ b/compute_tools/src/http/api.rs
@@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
+use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
+use utils::failpoint_support::failpoints_handler;
+use utils::http::error::ApiError;
use utils::http::request::must_get_query_param;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
@@ -310,6 +313,18 @@ async fn routes(req: Request
, compute: &Arc) -> Response {
+ match failpoints_handler(req, CancellationToken::new()).await {
+ Ok(r) => r,
+ Err(ApiError::BadRequest(e)) => {
+ render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
+ }
+ Err(_) => {
+ render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
+ }
+ }
+ }
+
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs
index 22ab145eda..07d738abe9 100644
--- a/compute_tools/src/migration.rs
+++ b/compute_tools/src/migration.rs
@@ -1,13 +1,16 @@
use anyhow::{Context, Result};
+use fail::fail_point;
use postgres::Client;
use tracing::info;
+/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}
impl<'m> MigrationRunner<'m> {
+ /// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
@@ -15,6 +18,7 @@ impl<'m> MigrationRunner<'m> {
Self { client, migrations }
}
+ /// Get the current value neon_migration.migration_id
fn get_migration_id(&mut self) -> Result {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
@@ -25,9 +29,34 @@ impl<'m> MigrationRunner<'m> {
Ok(row.get::<&str, i64>("id"))
}
+ /// Update the neon_migration.migration_id value
+ ///
+ /// This function has a fail point called compute-migration, which can be
+ /// used if you would like to fail the application of a series of migrations
+ /// at some point.
fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
+ // We use this fail point in order to check that failing in the
+ // middle of applying a series of migrations fails in an expected
+ // manner
+ if cfg!(feature = "testing") {
+ let fail = (|| {
+ fail_point!("compute-migration", |fail_migration_id| {
+ migration_id == fail_migration_id.unwrap().parse::().unwrap()
+ });
+
+ false
+ })();
+
+ if fail {
+ return Err(anyhow::anyhow!(format!(
+ "migration {} was configured to fail because of a failpoint",
+ migration_id
+ )));
+ }
+ }
+
self.client
.simple_query(&setval)
.context("run_migrations update id")?;
@@ -35,7 +64,8 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}
- fn prepare_migrations(&mut self) -> Result<()> {
+ /// Prepare the migrations the target database for handling migrations
+ fn prepare_database(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;
@@ -54,8 +84,9 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}
+ /// Run the configrured set of migrations
pub fn run_migrations(mut self) -> Result<()> {
- self.prepare_migrations()?;
+ self.prepare_database()?;
let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
@@ -69,6 +100,11 @@ impl<'m> MigrationRunner<'m> {
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id!(current_migration));
+
+ // Even though we are skipping the migration, updating the
+ // migration ID should help keep logic easy to understand when
+ // trying to understand the state of a cluster.
+ self.update_migration_id(migration_id!(current_migration))?;
} else {
info!(
"Running migration id={}:\n{}\n",
@@ -87,7 +123,6 @@ impl<'m> MigrationRunner<'m> {
)
})?;
- // Migration IDs start at 1
self.update_migration_id(migration_id!(current_migration))?;
self.client
diff --git a/compute_tools/src/migrations/tests/0001-neon_superuser_bypass_rls.sql b/compute_tools/src/migrations/tests/0001-neon_superuser_bypass_rls.sql
new file mode 100644
index 0000000000..0c81cef1c4
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0001-neon_superuser_bypass_rls.sql
@@ -0,0 +1,9 @@
+DO $$
+DECLARE
+ bypassrls boolean;
+BEGIN
+ SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
+ IF NOT bypassrls THEN
+ RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0002-alter_roles.sql b/compute_tools/src/migrations/tests/0002-alter_roles.sql
new file mode 100644
index 0000000000..433f7b34f7
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0002-alter_roles.sql
@@ -0,0 +1,25 @@
+DO $$
+DECLARE
+ role record;
+BEGIN
+ FOR role IN
+ SELECT rolname AS name, rolinherit AS inherit
+ FROM pg_roles
+ WHERE pg_has_role(rolname, 'neon_superuser', 'member')
+ LOOP
+ IF NOT role.inherit THEN
+ RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
+ END IF;
+ END LOOP;
+
+ FOR role IN
+ SELECT rolname AS name, rolbypassrls AS bypassrls
+ FROM pg_roles
+ WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
+ AND NOT starts_with(rolname, 'pg_')
+ LOOP
+ IF role.bypassrls THEN
+ RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
+ END IF;
+ END LOOP;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0003-grant_pg_create_subscription_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0003-grant_pg_create_subscription_to_neon_superuser.sql
new file mode 100644
index 0000000000..b164d61295
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0003-grant_pg_create_subscription_to_neon_superuser.sql
@@ -0,0 +1,10 @@
+DO $$
+BEGIN
+ IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
+ RETURN;
+ END IF;
+
+ IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
+ RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql
new file mode 100644
index 0000000000..acb8dd417d
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql
@@ -0,0 +1,19 @@
+DO $$
+DECLARE
+ monitor record;
+BEGIN
+ SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
+ admin_option AS admin
+ INTO monitor
+ FROM pg_auth_members
+ WHERE roleid = 'pg_monitor'::regrole
+ AND member = 'pg_monitor'::regrole;
+
+ IF NOT monitor.member THEN
+ RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
+ END IF;
+
+ IF NOT monitor.admin THEN
+ RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0005-grant_all_on_tables_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0005-grant_all_on_tables_to_neon_superuser.sql
new file mode 100644
index 0000000000..f99101bd65
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0005-grant_all_on_tables_to_neon_superuser.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0006-grant_all_on_sequences_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0006-grant_all_on_sequences_to_neon_superuser.sql
new file mode 100644
index 0000000000..f99101bd65
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0006-grant_all_on_sequences_to_neon_superuser.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql b/compute_tools/src/migrations/tests/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql
new file mode 100644
index 0000000000..f99101bd65
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql b/compute_tools/src/migrations/tests/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql
new file mode 100644
index 0000000000..f99101bd65
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0009-revoke_replication_for_previously_allowed_roles.sql b/compute_tools/src/migrations/tests/0009-revoke_replication_for_previously_allowed_roles.sql
new file mode 100644
index 0000000000..f99101bd65
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0009-revoke_replication_for_previously_allowed_roles.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql
new file mode 100644
index 0000000000..af7f50e95d
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql
@@ -0,0 +1,13 @@
+DO $$
+DECLARE
+ can_execute boolean;
+BEGIN
+ SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
+ INTO can_execute
+ FROM pg_proc
+ WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
+ AND pronamespace = 'pg_catalog'::regnamespace;
+ IF NOT can_execute THEN
+ RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql
new file mode 100644
index 0000000000..e55dcdc3b6
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql
@@ -0,0 +1,13 @@
+DO $$
+DECLARE
+ can_execute boolean;
+BEGIN
+ SELECT has_function_privilege('neon_superuser', oid, 'execute')
+ INTO can_execute
+ FROM pg_proc
+ WHERE proname = 'pg_show_replication_origin_status'
+ AND pronamespace = 'pg_catalog'::regnamespace;
+ IF NOT can_execute THEN
+ RAISE EXCEPTION 'neon_superuser cannot execute pg_show_replication_origin_status';
+ END IF;
+END $$;
diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs
index 870684b399..701ba2d42c 100644
--- a/libs/utils/src/failpoint_support.rs
+++ b/libs/utils/src/failpoint_support.rs
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
-/// Declare a failpoint that can use the `pause` failpoint action.
+/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
#[macro_export]
macro_rules! pausable_failpoint {
@@ -181,7 +181,7 @@ pub async fn failpoints_handler(
) -> Result, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
- "Cannot manage failpoints because storage was compiled without failpoints support"
+ "Cannot manage failpoints because neon was compiled without failpoints support"
)));
}
diff --git a/test_runner/conftest.py b/test_runner/conftest.py
index 887bfef478..9e32469d69 100644
--- a/test_runner/conftest.py
+++ b/test_runner/conftest.py
@@ -8,6 +8,7 @@ pytest_plugins = (
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.paths",
+ "fixtures.compute_migrations",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",
diff --git a/test_runner/fixtures/compute_migrations.py b/test_runner/fixtures/compute_migrations.py
new file mode 100644
index 0000000000..ea99785af0
--- /dev/null
+++ b/test_runner/fixtures/compute_migrations.py
@@ -0,0 +1,34 @@
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING
+
+import pytest
+
+from fixtures.paths import BASE_DIR
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+ from pathlib import Path
+
+COMPUTE_MIGRATIONS_DIR = BASE_DIR / "compute_tools" / "src" / "migrations"
+COMPUTE_MIGRATIONS_TEST_DIR = COMPUTE_MIGRATIONS_DIR / "tests"
+
+COMPUTE_MIGRATIONS = sorted(next(os.walk(COMPUTE_MIGRATIONS_DIR))[2])
+NUM_COMPUTE_MIGRATIONS = len(COMPUTE_MIGRATIONS)
+
+
+@pytest.fixture(scope="session")
+def compute_migrations_dir() -> Iterator[Path]:
+ """
+ Retrieve the path to the compute migrations directory.
+ """
+ yield COMPUTE_MIGRATIONS_DIR
+
+
+@pytest.fixture(scope="session")
+def compute_migrations_test_dir() -> Iterator[Path]:
+ """
+ Retrieve the path to the compute migrations test directory.
+ """
+ yield COMPUTE_MIGRATIONS_TEST_DIR
diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py
index 1cd9158c68..aa0d95fe80 100644
--- a/test_runner/fixtures/endpoint/http.py
+++ b/test_runner/fixtures/endpoint/http.py
@@ -55,3 +55,17 @@ class EndpointHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text
+
+ def configure_failpoints(self, *args: tuple[str, str]) -> None:
+ body: list[dict[str, str]] = []
+
+ for fp in args:
+ body.append(
+ {
+ "name": fp[0],
+ "action": fp[1],
+ }
+ )
+
+ res = self.post(f"http://localhost:{self.port}/failpoints", json=body)
+ res.raise_for_status()
diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py
index a85a191455..adbd6414a7 100644
--- a/test_runner/fixtures/neon_cli.py
+++ b/test_runner/fixtures/neon_cli.py
@@ -522,14 +522,15 @@ class NeonLocalCli(AbstractNeonCli):
safekeepers: list[int] | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
- allow_multiple=False,
+ allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
+ env: dict[str, str] | None = None,
) -> subprocess.CompletedProcess[str]:
args = [
"endpoint",
"start",
]
- extra_env_vars = {}
+ extra_env_vars = env or {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
if remote_ext_config is not None:
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index 9f78ad120b..a0c642163d 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -54,6 +54,7 @@ from fixtures.common_types import (
TimelineArchivalState,
TimelineId,
)
+from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.h2server import H2Server
from fixtures.log_helper import log
@@ -3855,6 +3856,7 @@ class Endpoint(PgProtocol, LogUtils):
safekeepers: list[int] | None = None,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
+ env: dict[str, str] | None = None,
) -> Self:
"""
Start the Postgres instance.
@@ -3875,6 +3877,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
basebackup_request_tries=basebackup_request_tries,
+ env=env,
)
self._running.release(1)
self.log_config_value("shared_buffers")
@@ -3988,14 +3991,17 @@ class Endpoint(PgProtocol, LogUtils):
log.info("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
json.dump(data_dict, file, indent=4)
- # Please note: Migrations only run if pg_skip_catalog_updates is false
- def wait_for_migrations(self, num_migrations: int = 11):
+ def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None:
+ """
+ Wait for all compute migrations to be ran. Remember that migrations only
+ run if "pg_skip_catalog_updates" is set in the compute spec to false.
+ """
with self.cursor() as cur:
def check_migrations_done():
cur.execute("SELECT id FROM neon_migration.migration_id")
migration_id: int = cur.fetchall()[0][0]
- assert migration_id >= num_migrations
+ assert migration_id >= wait_for
wait_until(check_migrations_done)
diff --git a/test_runner/fixtures/paths.py b/test_runner/fixtures/paths.py
index 80777d65e9..fc4fb3629b 100644
--- a/test_runner/fixtures/paths.py
+++ b/test_runner/fixtures/paths.py
@@ -21,8 +21,8 @@ if TYPE_CHECKING:
BASE_DIR = Path(__file__).parents[2]
-COMPUTE_CONFIG_DIR = BASE_DIR / "compute" / "etc"
DEFAULT_OUTPUT_DIR: str = "test_output"
+COMPUTE_CONFIG_DIR = BASE_DIR / "compute" / "etc"
def get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str | None = None) -> Path:
diff --git a/test_runner/regress/test_compute_migrations.py b/test_runner/regress/test_compute_migrations.py
new file mode 100644
index 0000000000..803702a6f8
--- /dev/null
+++ b/test_runner/regress/test_compute_migrations.py
@@ -0,0 +1,90 @@
+from __future__ import annotations
+
+from pathlib import Path
+from typing import TYPE_CHECKING, cast
+
+import pytest
+from fixtures.compute_migrations import COMPUTE_MIGRATIONS, NUM_COMPUTE_MIGRATIONS
+
+if TYPE_CHECKING:
+ from fixtures.neon_fixtures import NeonEnv
+
+
+def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_dir: Path):
+ """
+ Test that compute_ctl can recover from migration failures next time it
+ starts, and that the persisted migration ID is correct in such cases.
+ """
+ env = neon_simple_env
+
+ endpoint = env.endpoints.create("main")
+ endpoint.respec(skip_pg_catalog_updates=False)
+
+ for i in range(1, NUM_COMPUTE_MIGRATIONS + 1):
+ endpoint.start(env={"FAILPOINTS": f"compute-migration=return({i})"})
+
+ # Make sure that the migrations ran
+ endpoint.wait_for_migrations(wait_for=i - 1)
+
+ # Confirm that we correctly recorded that in the
+ # neon_migration.migration_id table
+ with endpoint.cursor() as cur:
+ cur.execute("SELECT id FROM neon_migration.migration_id")
+ migration_id = cast("int", cur.fetchall()[0][0])
+ assert migration_id == i - 1
+
+ endpoint.stop()
+
+ endpoint.start()
+
+ # Now wait for the rest of the migrations
+ endpoint.wait_for_migrations()
+
+ with endpoint.cursor() as cur:
+ cur.execute("SELECT id FROM neon_migration.migration_id")
+ migration_id = cast("int", cur.fetchall()[0][0])
+ assert migration_id == NUM_COMPUTE_MIGRATIONS
+
+ for i, m in enumerate(COMPUTE_MIGRATIONS, start=1):
+ migration_query = (compute_migrations_dir / m).read_text(encoding="utf-8")
+ if not migration_query.startswith("-- SKIP"):
+ pattern = rf"Skipping migration id={i}"
+ else:
+ pattern = rf"Running migration id={i}"
+
+ endpoint.log_contains(pattern)
+
+
+@pytest.mark.parametrize(
+ "migration",
+ (pytest.param((i, m), id=str(i)) for i, m in enumerate(COMPUTE_MIGRATIONS, start=1)),
+)
+def test_compute_migrations_e2e(
+ neon_simple_env: NeonEnv,
+ compute_migrations_dir: Path,
+ compute_migrations_test_dir: Path,
+ migration: tuple[int, str],
+):
+ """
+ Test that the migrations perform as advertised.
+ """
+ env = neon_simple_env
+
+ migration_id = migration[0]
+ migration_filename = migration[1]
+
+ migration_query = (compute_migrations_dir / migration_filename).read_text(encoding="utf-8")
+ if migration_query.startswith("-- SKIP"):
+ pytest.skip("The migration is marked as SKIP")
+
+ endpoint = env.endpoints.create("main")
+ endpoint.respec(skip_pg_catalog_updates=False)
+
+ # Stop applying migrations after the one we want to test, so that we can
+ # test the state of the cluster at the given migration ID
+ endpoint.start(env={"FAILPOINTS": f"compute-migration=return({migration_id + 1})"})
+
+ endpoint.wait_for_migrations(wait_for=migration_id)
+
+ check_query = (compute_migrations_test_dir / migration_filename).read_text(encoding="utf-8")
+ endpoint.safe_psql(check_query)
diff --git a/test_runner/regress/test_migrations.py b/test_runner/regress/test_migrations.py
deleted file mode 100644
index 7211619a99..0000000000
--- a/test_runner/regress/test_migrations.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from __future__ import annotations
-
-import time
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
- from fixtures.neon_fixtures import NeonEnv
-
-
-def test_migrations(neon_simple_env: NeonEnv):
- env = neon_simple_env
-
- endpoint = env.endpoints.create("main")
- endpoint.respec(skip_pg_catalog_updates=False)
- endpoint.start()
-
- num_migrations = 11
- endpoint.wait_for_migrations(num_migrations=num_migrations)
-
- with endpoint.cursor() as cur:
- cur.execute("SELECT id FROM neon_migration.migration_id")
- migration_id = cur.fetchall()
- assert migration_id[0][0] == num_migrations
-
- endpoint.stop()
- endpoint.start()
- # We don't have a good way of knowing that the migrations code path finished executing
- # in compute_ctl in the case that no migrations are being run
- time.sleep(1)
- with endpoint.cursor() as cur:
- cur.execute("SELECT id FROM neon_migration.migration_id")
- migration_id = cur.fetchall()
- assert migration_id[0][0] == num_migrations