diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 62e541ebce..e67430d061 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -692,10 +692,11 @@ impl ComputeNode { // Proceed with post-startup configuration. Note, that order of operations is important. let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; create_neon_superuser(spec, &mut client)?; + cleanup_instance(&mut client)?; handle_roles(spec, &mut client)?; handle_databases(spec, &mut client)?; handle_role_deletions(spec, self.connstr.as_str(), &mut client)?; - handle_grants(spec, self.connstr.as_str())?; + handle_grants(spec, &mut client, self.connstr.as_str())?; handle_extensions(spec, &mut client)?; create_availability_check_data(&mut client)?; @@ -731,10 +732,11 @@ impl ComputeNode { // Disable DDL forwarding because control plane already knows about these roles/databases. if spec.mode == ComputeMode::Primary { client.simple_query("SET neon.forward_ddl = false")?; + cleanup_instance(&mut client)?; handle_roles(&spec, &mut client)?; handle_databases(&spec, &mut client)?; handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?; - handle_grants(&spec, self.connstr.as_str())?; + handle_grants(&spec, &mut client, self.connstr.as_str())?; handle_extensions(&spec, &mut client)?; } diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index b94a97a126..b79e516650 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::fmt::Write; use std::fs; use std::fs::File; @@ -205,22 +206,37 @@ pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result> { } /// Build a list of existing Postgres databases -pub fn get_existing_dbs(client: &mut Client) -> Result> { - let postgres_dbs = client +pub fn get_existing_dbs(client: &mut Client) -> Result> { + // `pg_database.datconnlimit = -2` means that the database is in the + // invalid state. See: + // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 + let postgres_dbs: Vec = client .query( - "SELECT datname, datdba::regrole::text as owner - FROM pg_catalog.pg_database;", + "SELECT + datname AS name, + datdba::regrole::text AS owner, + NOT datallowconn AS restrict_conn, + datconnlimit = - 2 AS invalid + FROM + pg_catalog.pg_database;", &[], )? .iter() .map(|row| Database { - name: row.get("datname"), + name: row.get("name"), owner: row.get("owner"), + restrict_conn: row.get("restrict_conn"), + invalid: row.get("invalid"), options: None, }) .collect(); - Ok(postgres_dbs) + let dbs_map = postgres_dbs + .iter() + .map(|db| (db.name.clone(), db.clone())) + .collect::>(); + + Ok(dbs_map) } /// Wait for Postgres to become ready to accept connections. It's ready to diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 74e7796be7..591cbe90c0 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -13,7 +13,7 @@ use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse}; -use compute_api::spec::{ComputeSpec, Database, PgIdent, Role}; +use compute_api::spec::{ComputeSpec, PgIdent, Role}; // Do control plane request and return response if any. In case of error it // returns a bool flag indicating whether it makes sense to retry the request @@ -161,6 +161,38 @@ pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> { Ok(()) } +/// Compute could be unexpectedly shut down, for example, during the +/// database dropping. This leaves the database in the invalid state, +/// which prevents new db creation with the same name. This function +/// will clean it up before proceeding with catalog updates. All +/// possible future cleanup operations may go here too. +#[instrument(skip_all)] +pub fn cleanup_instance(client: &mut Client) -> Result<()> { + let existing_dbs = get_existing_dbs(client)?; + + for (_, db) in existing_dbs { + if db.invalid { + // After recent commit in Postgres, interrupted DROP DATABASE + // leaves the database in the invalid state. According to the + // commit message, the only option for user is to drop it again. + // See: + // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 + // + // Postgres Neon extension is done the way, that db is de-registered + // in the control plane metadata only after it is dropped. So there is + // a chance that it still thinks that db should exist. This means + // that it will be re-created by `handle_databases()`. Yet, it's fine + // as user can just repeat drop (in vanilla Postgres they would need + // to do the same, btw). + let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()); + info!("dropping invalid database {}", db.name); + client.execute(query.as_str(), &[])?; + } + } + + Ok(()) +} + /// Given a cluster spec json and open transaction it handles roles creation, /// deletion and update. #[instrument(skip_all)] @@ -379,13 +411,13 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent /// which together provide us idempotency. #[instrument(skip_all)] pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - let existing_dbs: Vec = get_existing_dbs(client)?; + let existing_dbs = get_existing_dbs(client)?; // Print a list of existing Postgres databases (only in debug mode) if span_enabled!(Level::INFO) { info!("postgres databases:"); - for r in &existing_dbs { - info!(" {}:{}", r.name, r.owner); + for (dbname, db) in &existing_dbs { + info!(" {}:{}", dbname, db.owner); } } @@ -439,8 +471,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { "rename_db" => { let new_name = op.new_name.as_ref().unwrap(); - // XXX: with a limited number of roles it is fine, but consider making it a HashMap - if existing_dbs.iter().any(|r| r.name == op.name) { + if existing_dbs.get(&op.name).is_some() { let query: String = format!( "ALTER DATABASE {} RENAME TO {}", op.name.pg_quote(), @@ -457,14 +488,12 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { } // Refresh Postgres databases info to handle possible renames - let existing_dbs: Vec = get_existing_dbs(client)?; + let existing_dbs = get_existing_dbs(client)?; info!("cluster spec databases:"); for db in &spec.cluster.databases { let name = &db.name; - - // XXX: with a limited number of databases it is fine, but consider making it a HashMap - let pg_db = existing_dbs.iter().find(|r| r.name == *name); + let pg_db = existing_dbs.get(name); enum DatabaseAction { None, @@ -530,13 +559,32 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. #[instrument(skip_all)] -pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> { - info!("cluster spec grants:"); +pub fn handle_grants(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> { + info!("modifying database permissions"); + let existing_dbs = get_existing_dbs(client)?; // Do some per-database access adjustments. We'd better do this at db creation time, // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants // atomically. for db in &spec.cluster.databases { + match existing_dbs.get(&db.name) { + Some(pg_db) => { + if pg_db.restrict_conn || pg_db.invalid { + info!( + "skipping grants for db {} (invalid: {}, connections not allowed: {})", + db.name, pg_db.invalid, pg_db.restrict_conn + ); + continue; + } + } + None => { + bail!( + "database {} doesn't exist in Postgres after handle_databases()", + db.name + ); + } + } + let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); @@ -575,6 +623,11 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> { // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. // This is needed because since postgres 15 this privilege is removed by default. + // TODO: web_access isn't created for almost 1 year. It could be that we have + // active users of 1 year old projects, but hopefully not, so check it and + // remove this code if possible. The worst thing that could happen is that + // user won't be able to use public schema in NEW databases created in the + // very OLD project. let grant_query = "DO $$\n\ BEGIN\n\ IF EXISTS(\n\ diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index b41ca8c9cf..cfbd50d38a 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -200,6 +200,12 @@ pub struct Database { pub name: PgIdent, pub owner: PgIdent, pub options: GenericOptions, + // These are derived flags, not present in the spec file. + // They are never set by the control plane. + #[serde(skip_deserializing, default)] + pub restrict_conn: bool, + #[serde(skip_deserializing, default)] + pub invalid: bool, } /// Common type representing both SQL statement params with or without value, diff --git a/pgxn/neon/control_plane_connector.c b/pgxn/neon/control_plane_connector.c index 8b0035b8e8..debbbce117 100644 --- a/pgxn/neon/control_plane_connector.c +++ b/pgxn/neon/control_plane_connector.c @@ -741,13 +741,6 @@ NeonProcessUtility( break; case T_DropdbStmt: HandleDropDb(castNode(DropdbStmt, parseTree)); - /* - * We do this here to hack around the fact that Postgres performs the drop - * INSIDE of standard_ProcessUtility, which means that if we try to - * abort the drop normally it'll be too late. DROP DATABASE can't be inside - * of a transaction block anyway, so this should be fine to do. - */ - NeonXactCallback(XACT_EVENT_PRE_COMMIT, NULL); break; case T_CreateRoleStmt: HandleCreateRole(castNode(CreateRoleStmt, parseTree)); diff --git a/test_runner/regress/test_ddl_forwarding.py b/test_runner/regress/test_ddl_forwarding.py index d4cf1b4739..6bd09c7030 100644 --- a/test_runner/regress/test_ddl_forwarding.py +++ b/test_runner/regress/test_ddl_forwarding.py @@ -4,7 +4,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type import psycopg2 import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import VanillaPostgres +from fixtures.neon_fixtures import NeonEnv, VanillaPostgres from pytest_httpserver import HTTPServer from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response @@ -205,6 +205,10 @@ def test_ddl_forwarding(ddl: DdlForwardingContext): ddl.wait() assert ddl.dbs == {"stork": "cork"} + cur.execute("DROP DATABASE stork") + ddl.wait() + assert ddl.dbs == {} + with pytest.raises(psycopg2.InternalError): ddl.failures(True) cur.execute("CREATE DATABASE failure WITH OWNER=cork") @@ -217,6 +221,94 @@ def test_ddl_forwarding(ddl: DdlForwardingContext): ddl.failures(True) cur.execute("DROP DATABASE failure") ddl.wait() - ddl.pg.connect(dbname="failure") # Ensure we can connect after a failed drop + assert ddl.dbs == {"failure": "cork"} + ddl.failures(False) + + # Check that db is still in the Postgres after failure + cur.execute("SELECT datconnlimit FROM pg_database WHERE datname = 'failure'") + result = cur.fetchone() + if not result: + raise AssertionError("Database 'failure' not found") + # -2 means invalid database + # It should be invalid because cplane request failed + assert result[0] == -2, "Database 'failure' is not invalid" + + # Check that repeated drop succeeds + cur.execute("DROP DATABASE failure") + ddl.wait() + assert ddl.dbs == {} + + # DB should be absent in the Postgres + cur.execute("SELECT count(*) FROM pg_database WHERE datname = 'failure'") + result = cur.fetchone() + if not result: + raise AssertionError("Could not count databases") + assert result[0] == 0, "Database 'failure' still exists after drop" conn.close() + + +# Assert that specified database has a specific connlimit, throwing an AssertionError otherwise +# -2 means invalid database +# -1 means no specific per-db limit (default) +def assert_db_connlimit(endpoint: Any, db_name: str, connlimit: int, msg: str): + with endpoint.cursor() as cur: + cur.execute("SELECT datconnlimit FROM pg_database WHERE datname = %s", (db_name,)) + result = cur.fetchone() + if not result: + raise AssertionError(f"Database '{db_name}' not found") + assert result[0] == connlimit, msg + + +# Test that compute_ctl can deal with invalid databases (drop them). +# If Postgres extension cannot reach cplane, then DROP will be aborted +# and database will be marked as invalid. Then there are two recovery +# flows: +# 1. User can just repeat DROP DATABASE command until it succeeds +# 2. User can ignore, then compute_ctl will drop invalid databases +# automatically during full configuration +# Here we test the latter. The first one is tested in test_ddl_forwarding +def test_ddl_forwarding_invalid_db(neon_simple_env: NeonEnv): + env = neon_simple_env + env.neon_cli.create_branch("test_ddl_forwarding_invalid_db", "empty") + endpoint = env.endpoints.create_start( + "test_ddl_forwarding_invalid_db", + # Some non-existent url + config_lines=["neon.console_url=http://localhost:9999/unknown/api/v0/roles_and_databases"], + ) + log.info("postgres is running on 'test_ddl_forwarding_invalid_db' branch") + + with endpoint.cursor() as cur: + cur.execute("SET neon.forward_ddl = false") + cur.execute("CREATE DATABASE failure") + cur.execute("COMMIT") + + assert_db_connlimit( + endpoint, "failure", -1, "Database 'failure' doesn't have a valid connlimit" + ) + + with pytest.raises(psycopg2.InternalError): + with endpoint.cursor() as cur: + cur.execute("DROP DATABASE failure") + cur.execute("COMMIT") + + # Should be invalid after failed drop + assert_db_connlimit(endpoint, "failure", -2, "Database 'failure' ins't invalid") + + endpoint.stop() + endpoint.start() + + # Still invalid after restart without full configuration + assert_db_connlimit(endpoint, "failure", -2, "Database 'failure' ins't invalid") + + endpoint.stop() + endpoint.respec(skip_pg_catalog_updates=False) + endpoint.start() + + # Should be cleaned up by compute_ctl during full configuration + with endpoint.cursor() as cur: + cur.execute("SELECT count(*) FROM pg_database WHERE datname = 'failure'") + result = cur.fetchone() + if not result: + raise AssertionError("Could not count databases") + assert result[0] == 0, "Database 'failure' still exists after restart"