[compute_ctl+pgxn] Handle invalid databases after failed drop (#5561)

## Problem

In 89275f6c1e we fixed an issue, when we were dropping db in Postgres
even though cplane request failed. Yet, it introduced a new problem that
we now de-register db in cplane even if we didn't actually drop it in
Postgres.

## Summary of changes

Here we revert extension change, so we now again may leave db in invalid
state after failed drop. Instead, `compute_ctl` is now responsible for
cleaning up invalid databases during full configuration. Thus, there are
two ways of recovering from failed DROP DATABASE:
1. User can just repeat DROP DATABASE, same as in Vanilla Postgres.
2. If they didn't, then on next full configuration (dbs / roles changes
   in the API; password reset; or data availability check) invalid db
   will be cleaned up in the Postgres and re-created by `compute_ctl`. So
   again it follows pretty much the same semantics as Vanilla Postgres --
   you need to drop it again after failed drop.

That way, we have a recovery trajectory for both problems.

See this commit for info about `invalid` db state:
  a4b4cc1d60

According to it:
> An invalid database cannot be connected to anymore, but can still be
dropped.

While on it, this commit also fixes another issue, when `compute_ctl`
was trying to connect to databases with `ALLOW CONNECTIONS false`. Now
it will just skip them.

Fixes #5435
This commit is contained in:
Alexey Kondratov
2023-10-16 20:46:45 +02:00
committed by GitHub
parent ded7f48565
commit 0ca342260c
6 changed files with 191 additions and 29 deletions

View File

@@ -692,10 +692,11 @@ impl ComputeNode {
// Proceed with post-startup configuration. Note, that order of operations is important.
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
create_neon_superuser(spec, &mut client)?;
cleanup_instance(&mut client)?;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str())?;
handle_grants(spec, &mut client, self.connstr.as_str())?;
handle_extensions(spec, &mut client)?;
create_availability_check_data(&mut client)?;
@@ -731,10 +732,11 @@ impl ComputeNode {
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
cleanup_instance(&mut client)?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str())?;
handle_grants(&spec, &mut client, self.connstr.as_str())?;
handle_extensions(&spec, &mut client)?;
}

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::fmt::Write;
use std::fs;
use std::fs::File;
@@ -205,22 +206,37 @@ pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result<Vec<Role>> {
}
/// Build a list of existing Postgres databases
pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
let postgres_dbs = client
pub fn get_existing_dbs(client: &mut Client) -> Result<HashMap<String, Database>> {
// `pg_database.datconnlimit = -2` means that the database is in the
// invalid state. See:
// https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
let postgres_dbs: Vec<Database> = client
.query(
"SELECT datname, datdba::regrole::text as owner
FROM pg_catalog.pg_database;",
"SELECT
datname AS name,
datdba::regrole::text AS owner,
NOT datallowconn AS restrict_conn,
datconnlimit = - 2 AS invalid
FROM
pg_catalog.pg_database;",
&[],
)?
.iter()
.map(|row| Database {
name: row.get("datname"),
name: row.get("name"),
owner: row.get("owner"),
restrict_conn: row.get("restrict_conn"),
invalid: row.get("invalid"),
options: None,
})
.collect();
Ok(postgres_dbs)
let dbs_map = postgres_dbs
.iter()
.map(|db| (db.name.clone(), db.clone()))
.collect::<HashMap<_, _>>();
Ok(dbs_map)
}
/// Wait for Postgres to become ready to accept connections. It's ready to

View File

@@ -13,7 +13,7 @@ use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
use compute_api::spec::{ComputeSpec, PgIdent, Role};
// Do control plane request and return response if any. In case of error it
// returns a bool flag indicating whether it makes sense to retry the request
@@ -161,6 +161,38 @@ pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
Ok(())
}
/// Compute could be unexpectedly shut down, for example, during the
/// database dropping. This leaves the database in the invalid state,
/// which prevents new db creation with the same name. This function
/// will clean it up before proceeding with catalog updates. All
/// possible future cleanup operations may go here too.
#[instrument(skip_all)]
pub fn cleanup_instance(client: &mut Client) -> Result<()> {
let existing_dbs = get_existing_dbs(client)?;
for (_, db) in existing_dbs {
if db.invalid {
// After recent commit in Postgres, interrupted DROP DATABASE
// leaves the database in the invalid state. According to the
// commit message, the only option for user is to drop it again.
// See:
// https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
//
// Postgres Neon extension is done the way, that db is de-registered
// in the control plane metadata only after it is dropped. So there is
// a chance that it still thinks that db should exist. This means
// that it will be re-created by `handle_databases()`. Yet, it's fine
// as user can just repeat drop (in vanilla Postgres they would need
// to do the same, btw).
let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
info!("dropping invalid database {}", db.name);
client.execute(query.as_str(), &[])?;
}
}
Ok(())
}
/// Given a cluster spec json and open transaction it handles roles creation,
/// deletion and update.
#[instrument(skip_all)]
@@ -379,13 +411,13 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
/// which together provide us idempotency.
#[instrument(skip_all)]
pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
let existing_dbs = get_existing_dbs(client)?;
// Print a list of existing Postgres databases (only in debug mode)
if span_enabled!(Level::INFO) {
info!("postgres databases:");
for r in &existing_dbs {
info!(" {}:{}", r.name, r.owner);
for (dbname, db) in &existing_dbs {
info!(" {}:{}", dbname, db.owner);
}
}
@@ -439,8 +471,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
"rename_db" => {
let new_name = op.new_name.as_ref().unwrap();
// XXX: with a limited number of roles it is fine, but consider making it a HashMap
if existing_dbs.iter().any(|r| r.name == op.name) {
if existing_dbs.get(&op.name).is_some() {
let query: String = format!(
"ALTER DATABASE {} RENAME TO {}",
op.name.pg_quote(),
@@ -457,14 +488,12 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
}
// Refresh Postgres databases info to handle possible renames
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
let existing_dbs = get_existing_dbs(client)?;
info!("cluster spec databases:");
for db in &spec.cluster.databases {
let name = &db.name;
// XXX: with a limited number of databases it is fine, but consider making it a HashMap
let pg_db = existing_dbs.iter().find(|r| r.name == *name);
let pg_db = existing_dbs.get(name);
enum DatabaseAction {
None,
@@ -530,13 +559,32 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> {
info!("cluster spec grants:");
pub fn handle_grants(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
info!("modifying database permissions");
let existing_dbs = get_existing_dbs(client)?;
// Do some per-database access adjustments. We'd better do this at db creation time,
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
// atomically.
for db in &spec.cluster.databases {
match existing_dbs.get(&db.name) {
Some(pg_db) => {
if pg_db.restrict_conn || pg_db.invalid {
info!(
"skipping grants for db {} (invalid: {}, connections not allowed: {})",
db.name, pg_db.invalid, pg_db.restrict_conn
);
continue;
}
}
None => {
bail!(
"database {} doesn't exist in Postgres after handle_databases()",
db.name
);
}
}
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
@@ -575,6 +623,11 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> {
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
// This is needed because since postgres 15 this privilege is removed by default.
// TODO: web_access isn't created for almost 1 year. It could be that we have
// active users of 1 year old projects, but hopefully not, so check it and
// remove this code if possible. The worst thing that could happen is that
// user won't be able to use public schema in NEW databases created in the
// very OLD project.
let grant_query = "DO $$\n\
BEGIN\n\
IF EXISTS(\n\

View File

@@ -200,6 +200,12 @@ pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
pub options: GenericOptions,
// These are derived flags, not present in the spec file.
// They are never set by the control plane.
#[serde(skip_deserializing, default)]
pub restrict_conn: bool,
#[serde(skip_deserializing, default)]
pub invalid: bool,
}
/// Common type representing both SQL statement params with or without value,

View File

@@ -741,13 +741,6 @@ NeonProcessUtility(
break;
case T_DropdbStmt:
HandleDropDb(castNode(DropdbStmt, parseTree));
/*
* We do this here to hack around the fact that Postgres performs the drop
* INSIDE of standard_ProcessUtility, which means that if we try to
* abort the drop normally it'll be too late. DROP DATABASE can't be inside
* of a transaction block anyway, so this should be fine to do.
*/
NeonXactCallback(XACT_EVENT_PRE_COMMIT, NULL);
break;
case T_CreateRoleStmt:
HandleCreateRole(castNode(CreateRoleStmt, parseTree));

View File

@@ -4,7 +4,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres
from fixtures.neon_fixtures import NeonEnv, VanillaPostgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -205,6 +205,10 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
ddl.wait()
assert ddl.dbs == {"stork": "cork"}
cur.execute("DROP DATABASE stork")
ddl.wait()
assert ddl.dbs == {}
with pytest.raises(psycopg2.InternalError):
ddl.failures(True)
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
@@ -217,6 +221,94 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
ddl.failures(True)
cur.execute("DROP DATABASE failure")
ddl.wait()
ddl.pg.connect(dbname="failure") # Ensure we can connect after a failed drop
assert ddl.dbs == {"failure": "cork"}
ddl.failures(False)
# Check that db is still in the Postgres after failure
cur.execute("SELECT datconnlimit FROM pg_database WHERE datname = 'failure'")
result = cur.fetchone()
if not result:
raise AssertionError("Database 'failure' not found")
# -2 means invalid database
# It should be invalid because cplane request failed
assert result[0] == -2, "Database 'failure' is not invalid"
# Check that repeated drop succeeds
cur.execute("DROP DATABASE failure")
ddl.wait()
assert ddl.dbs == {}
# DB should be absent in the Postgres
cur.execute("SELECT count(*) FROM pg_database WHERE datname = 'failure'")
result = cur.fetchone()
if not result:
raise AssertionError("Could not count databases")
assert result[0] == 0, "Database 'failure' still exists after drop"
conn.close()
# Assert that specified database has a specific connlimit, throwing an AssertionError otherwise
# -2 means invalid database
# -1 means no specific per-db limit (default)
def assert_db_connlimit(endpoint: Any, db_name: str, connlimit: int, msg: str):
with endpoint.cursor() as cur:
cur.execute("SELECT datconnlimit FROM pg_database WHERE datname = %s", (db_name,))
result = cur.fetchone()
if not result:
raise AssertionError(f"Database '{db_name}' not found")
assert result[0] == connlimit, msg
# Test that compute_ctl can deal with invalid databases (drop them).
# If Postgres extension cannot reach cplane, then DROP will be aborted
# and database will be marked as invalid. Then there are two recovery
# flows:
# 1. User can just repeat DROP DATABASE command until it succeeds
# 2. User can ignore, then compute_ctl will drop invalid databases
# automatically during full configuration
# Here we test the latter. The first one is tested in test_ddl_forwarding
def test_ddl_forwarding_invalid_db(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_ddl_forwarding_invalid_db", "empty")
endpoint = env.endpoints.create_start(
"test_ddl_forwarding_invalid_db",
# Some non-existent url
config_lines=["neon.console_url=http://localhost:9999/unknown/api/v0/roles_and_databases"],
)
log.info("postgres is running on 'test_ddl_forwarding_invalid_db' branch")
with endpoint.cursor() as cur:
cur.execute("SET neon.forward_ddl = false")
cur.execute("CREATE DATABASE failure")
cur.execute("COMMIT")
assert_db_connlimit(
endpoint, "failure", -1, "Database 'failure' doesn't have a valid connlimit"
)
with pytest.raises(psycopg2.InternalError):
with endpoint.cursor() as cur:
cur.execute("DROP DATABASE failure")
cur.execute("COMMIT")
# Should be invalid after failed drop
assert_db_connlimit(endpoint, "failure", -2, "Database 'failure' ins't invalid")
endpoint.stop()
endpoint.start()
# Still invalid after restart without full configuration
assert_db_connlimit(endpoint, "failure", -2, "Database 'failure' ins't invalid")
endpoint.stop()
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# Should be cleaned up by compute_ctl during full configuration
with endpoint.cursor() as cur:
cur.execute("SELECT count(*) FROM pg_database WHERE datname = 'failure'")
result = cur.fetchone()
if not result:
raise AssertionError("Could not count databases")
assert result[0] == 0, "Database 'failure' still exists after restart"