fix(compute_ctl): Skip invalid DBs in PerDatabasePhase (#10910)

## Problem

After refactoring the configuration code to phases, it became a bit
fuzzy who filters out DBs that are not present in Postgres, are invalid,
or have `datallowconn = false`. The first 2 are important for the DB
dropping case, as we could be in operation retry, so DB could be already
absent in Postgres or invalid (interrupted `DROP DATABASE`).

Recent case:
https://neondb.slack.com/archives/C03H1K0PGKH/p1740053359712419

## Summary of changes

Add a common code that filters out inaccessible DBs inside
`ApplySpecPhase::RunInEachDatabase`.
This commit is contained in:
Alexey Kondratov
2025-02-21 22:50:50 +01:00
committed by GitHub
parent 4bbe75de8c
commit df264380b9

View File

@@ -7,12 +7,12 @@ use std::sync::Arc;
use crate::compute::construct_superuser_query;
use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
use anyhow::{bail, Result};
use anyhow::Result;
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use tracing::{debug, info_span, Instrument};
use tracing::{debug, info_span, warn, Instrument};
#[derive(Clone)]
pub enum DB {
@@ -47,6 +47,11 @@ pub enum PerDatabasePhase {
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
/// This is a shared phase, used for both i) dropping dangling LR subscriptions
/// before dropping the DB, and ii) dropping all subscriptions after creating
/// a fresh branch.
/// N.B. we will skip all DBs that are not present in Postgres, invalid, or
/// have `datallowconn = false` (`restrict_conn`).
DropLogicalSubscriptions,
}
@@ -168,7 +173,7 @@ where
///
/// In the future we may generate a single stream of changes and then
/// sort/merge/batch execution, but for now this is a nice way to improve
/// batching behaviour of the commands.
/// batching behavior of the commands.
async fn get_operations<'a>(
spec: &'a ComputeSpec,
ctx: &'a RwLock<MutableApplyContext>,
@@ -451,6 +456,38 @@ async fn get_operations<'a>(
)),
}))),
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
// Do some checks that user DB exists and we can access it.
//
// During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
// which happen before dropping the DB, the current run could be a retry,
// so it's a valid case when DB is absent already. The case of
// `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
// in theory user can have some dangling objects there, so we will fail at
// the actual drop later. Yet, to fix that in the current code we would need
// to ALTER DATABASE, and then check back, but that even more invasive, so
// that's not what we really want to do here.
//
// For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
if let DB::UserDB(db) = db {
let databases = &ctx.read().await.dbs;
let edb = match databases.get(&db.name) {
Some(edb) => edb,
None => {
warn!("skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL", subphase, db.name);
return Ok(Box::new(empty()));
}
};
if edb.restrict_conn || edb.invalid {
warn!(
"skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
subphase, db.name, edb.restrict_conn, edb.invalid
);
return Ok(Box::new(empty()));
}
}
match subphase {
PerDatabasePhase::DropLogicalSubscriptions => {
match &db {
@@ -530,25 +567,12 @@ async fn get_operations<'a>(
Ok(Box::new(operations))
}
PerDatabasePhase::ChangeSchemaPerms => {
let ctx = ctx.read().await;
let databases = &ctx.dbs;
let db = match &db {
// ignore schema permissions on the system database
DB::SystemDB => return Ok(Box::new(empty())),
DB::UserDB(db) => db,
};
if databases.get(&db.name).is_none() {
bail!("database {} doesn't exist in PostgreSQL", db.name);
}
let edb = databases.get(&db.name).unwrap();
if edb.restrict_conn || edb.invalid {
return Ok(Box::new(empty()));
}
let operations = vec![
Operation {
query: format!(
@@ -566,6 +590,7 @@ async fn get_operations<'a>(
Ok(Box::new(operations))
}
// TODO: remove this completely https://github.com/neondatabase/cloud/issues/22663
PerDatabasePhase::HandleAnonExtension => {
// Only install Anon into user databases
let db = match &db {