From ea1858e3b66fa058ce8ddfb6f37b364154dd20a6 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Wed, 20 Nov 2024 02:14:58 +0100 Subject: [PATCH] compute_ctl: Streamline and Pipeline startup SQL (#9717) Before, compute_ctl didn't have a good registry for what command would run when, depending exclusively on sync code to apply changes. When users have many databases/roles to manage, this step can take a substantial amount of time, breaking assumptions about low (re)start times in other systems. This commit reduces the time compute_ctl takes to restart when changes must be applied, by making all commands more or less blind writes, and applying these commands in an asynchronous context, only waiting for completion once we know the commands have all been sent. Additionally, this reduces time spent by batching per-database operations where previously we would create a new SQL connection for every user-database operation we planned to execute. --- compute_tools/src/catalog.rs | 44 +- compute_tools/src/checker.rs | 28 - compute_tools/src/compute.rs | 397 ++++++++-- compute_tools/src/lib.rs | 1 + compute_tools/src/pg_helpers.rs | 39 +- compute_tools/src/spec.rs | 634 +--------------- compute_tools/src/spec_apply.rs | 680 ++++++++++++++++++ .../src/sql/add_availabilitycheck_tables.sql | 18 + .../src/sql/anon_ext_fn_reassign.sql | 12 + compute_tools/src/sql/default_grants.sql | 30 + .../src/sql/set_public_schema_owner.sql | 23 + .../src/sql/unset_template_for_drop_dbs.sql | 12 + 12 files changed, 1146 insertions(+), 772 deletions(-) create mode 100644 compute_tools/src/spec_apply.rs create mode 100644 compute_tools/src/sql/add_availabilitycheck_tables.sql create mode 100644 compute_tools/src/sql/anon_ext_fn_reassign.sql create mode 100644 compute_tools/src/sql/default_grants.sql create mode 100644 compute_tools/src/sql/set_public_schema_owner.sql create mode 100644 compute_tools/src/sql/unset_template_for_drop_dbs.sql diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 4fefa831e0..2f6f82dd39 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -1,38 +1,40 @@ -use compute_api::{ - responses::CatalogObjects, - spec::{Database, Role}, -}; +use compute_api::responses::CatalogObjects; use futures::Stream; -use postgres::{Client, NoTls}; +use postgres::NoTls; use std::{path::Path, process::Stdio, result::Result, sync::Arc}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, - task, + spawn, }; +use tokio_postgres::connect; use tokio_stream::{self as stream, StreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; use tracing::warn; -use crate::{ - compute::ComputeNode, - pg_helpers::{get_existing_dbs, get_existing_roles}, -}; +use crate::compute::ComputeNode; +use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async}; pub async fn get_dbs_and_roles(compute: &Arc) -> anyhow::Result { let connstr = compute.connstr.clone(); - task::spawn_blocking(move || { - let mut client = Client::connect(connstr.as_str(), NoTls)?; - let roles: Vec; - { - let mut xact = client.transaction()?; - roles = get_existing_roles(&mut xact)?; - } - let databases: Vec = get_existing_dbs(&mut client)?.values().cloned().collect(); - Ok(CatalogObjects { roles, databases }) - }) - .await? + let (client, connection): (tokio_postgres::Client, _) = + connect(connstr.as_str(), NoTls).await?; + + spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let roles = get_existing_roles_async(&client).await?; + + let databases = get_existing_dbs_async(&client) + .await? + .into_values() + .collect(); + + Ok(CatalogObjects { roles, databases }) } #[derive(Debug, thiserror::Error)] diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs index d76eaad0a0..cec2b1bed8 100644 --- a/compute_tools/src/checker.rs +++ b/compute_tools/src/checker.rs @@ -1,37 +1,9 @@ use anyhow::{anyhow, Ok, Result}; -use postgres::Client; use tokio_postgres::NoTls; use tracing::{error, instrument, warn}; use crate::compute::ComputeNode; -/// Create a special service table for availability checks -/// only if it does not exist already. -pub fn create_availability_check_data(client: &mut Client) -> Result<()> { - let query = " - DO $$ - BEGIN - IF NOT EXISTS( - SELECT 1 - FROM pg_catalog.pg_tables - WHERE tablename = 'health_check' - ) - THEN - CREATE TABLE health_check ( - id serial primary key, - updated_at timestamptz default now() - ); - INSERT INTO health_check VALUES (1, now()) - ON CONFLICT (id) DO UPDATE - SET updated_at = now(); - END IF; - END - $$;"; - client.execute(query, &[])?; - - Ok(()) -} - /// Update timestamp in a row in a special service table to check /// that we can actually write some data in this particular timeline. #[instrument(skip_all)] diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 0a8cb14058..4f67425ba8 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,20 +1,21 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env; use std::fs; +use std::iter::once; use std::os::unix::fs::{symlink, PermissionsExt}; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; -use std::sync::{Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::time::Duration; use std::time::Instant; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use compute_api::spec::PgIdent; +use compute_api::spec::{PgIdent, Role}; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -31,15 +32,23 @@ use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion}; use utils::measured_stream::MeasuredReader; use nix::sys::signal::{kill, Signal}; - use remote_storage::{DownloadError, RemotePath}; +use tokio::spawn; +use url::Url; -use crate::checker::create_availability_check_data; use crate::installed_extensions::get_installed_extensions_sync; use crate::local_proxy; -use crate::logger::inlinify; use crate::pg_helpers::*; use crate::spec::*; +use crate::spec_apply::ApplySpecPhase::{ + CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSuperUser, + DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions, + RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, +}; +use crate::spec_apply::PerDatabasePhase::{ + ChangeSchemaPerms, DeleteDBRoleReferences, HandleAnonExtension, +}; +use crate::spec_apply::{apply_operations, MutableApplyContext, DB}; use crate::sync_sk::{check_if_synced, ping_safekeeper}; use crate::{config, extension_server}; @@ -224,10 +233,7 @@ fn maybe_cgexec(cmd: &str) -> Command { } } -/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser -/// that we give to customers -#[instrument(skip_all)] -fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> { +pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String { let roles = spec .cluster .roles @@ -296,11 +302,8 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> $$;"#, roles_decl, database_decl, ); - info!("Neon superuser created: {}", inlinify(&query)); - client - .simple_query(&query) - .map_err(|e| anyhow::anyhow!(e).context(query))?; - Ok(()) + + query } impl ComputeNode { @@ -813,21 +816,14 @@ impl ComputeNode { Ok(()) } - /// Do initial configuration of the already started Postgres. - #[instrument(skip_all)] - pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - // If connection fails, - // it may be the old node with `zenith_admin` superuser. - // - // In this case we need to connect with old `zenith_admin` name - // and create new user. We cannot simply rename connected user, - // but we can create a new one and grant it all privileges. - let mut connstr = self.connstr.clone(); + async fn get_maintenance_client(url: &Url) -> Result { + let mut connstr = url.clone(); + connstr .query_pairs_mut() .append_pair("application_name", "apply_config"); - let mut client = match Client::connect(connstr.as_str(), NoTls) { + let (client, conn) = match tokio_postgres::connect(connstr.as_str(), NoTls).await { Err(e) => match e.code() { Some(&SqlState::INVALID_PASSWORD) | Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => { @@ -845,8 +841,8 @@ impl ComputeNode { let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls) .context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?; - // Disable forwarding so that users don't get a cloud_admin role + // Disable forwarding so that users don't get a cloud_admin role let mut func = || { client.simple_query("SET neon.forward_ddl = false")?; client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?; @@ -858,49 +854,309 @@ impl ComputeNode { drop(client); // reconnect with connstring with expected name - Client::connect(connstr.as_str(), NoTls)? + tokio_postgres::connect(connstr.as_str(), NoTls).await? } _ => return Err(e.into()), }, - Ok(client) => client, + Ok((client, conn)) => (client, conn), }; - // Disable DDL forwarding because control plane already knows about these roles/databases. + spawn(async move { + if let Err(e) = conn.await { + error!("maintenance client connection error: {}", e); + } + }); + + // Disable DDL forwarding because control plane already knows about the roles/databases + // we're about to modify. client .simple_query("SET neon.forward_ddl = false") + .await .context("apply_config SET neon.forward_ddl = false")?; - // Proceed with post-startup configuration. Note, that order of operations is important. - let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; - create_neon_superuser(spec, &mut client).context("apply_config create_neon_superuser")?; - cleanup_instance(&mut client).context("apply_config cleanup_instance")?; - handle_roles(spec, &mut client).context("apply_config handle_roles")?; - handle_databases(spec, &mut client).context("apply_config handle_databases")?; - handle_role_deletions(spec, connstr.as_str(), &mut client) - .context("apply_config handle_role_deletions")?; - handle_grants( - spec, - &mut client, - connstr.as_str(), - self.has_feature(ComputeFeature::AnonExtension), - ) - .context("apply_config handle_grants")?; - handle_extensions(spec, &mut client).context("apply_config handle_extensions")?; - handle_extension_neon(&mut client).context("apply_config handle_extension_neon")?; - create_availability_check_data(&mut client) - .context("apply_config create_availability_check_data")?; + Ok(client) + } - // 'Close' connection - drop(client); + /// Apply the spec to the running PostgreSQL instance. + /// The caller can decide to run with multiple clients in parallel, or + /// single mode. Either way, the commands executed will be the same, and + /// only commands run in different databases are parallelized. + #[instrument(skip_all)] + pub fn apply_spec_sql( + &self, + spec: Arc, + url: Arc, + concurrency: usize, + ) -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; - if let Some(ref local_proxy) = spec.local_proxy_config { + info!("Applying config with max {} concurrency", concurrency); + debug!("Config: {:?}", spec); + + rt.block_on(async { + // Proceed with post-startup configuration. Note, that order of operations is important. + let client = Self::get_maintenance_client(&url).await?; + let spec = spec.clone(); + + let databases = get_existing_dbs_async(&client).await?; + let roles = get_existing_roles_async(&client) + .await? + .into_iter() + .map(|role| (role.name.clone(), role)) + .collect::>(); + + let jwks_roles = Arc::new( + spec.as_ref() + .local_proxy_config + .iter() + .flat_map(|it| &it.jwks) + .flatten() + .flat_map(|setting| &setting.role_names) + .cloned() + .collect::>(), + ); + + let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext { + roles, + dbs: databases, + })); + + for phase in [ + CreateSuperUser, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + ] { + debug!("Applying phase {:?}", &phase); + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + phase, + || async { Ok(&client) }, + ) + .await?; + } + + let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + let db_processes = spec + .cluster + .databases + .iter() + .map(|db| DB::new(db.clone())) + // include + .chain(once(DB::SystemDB)) + .map(|db| { + let spec = spec.clone(); + let ctx = ctx.clone(); + let jwks_roles = jwks_roles.clone(); + let mut url = url.as_ref().clone(); + let concurrency_token = concurrency_token.clone(); + let db = db.clone(); + + debug!("Applying per-database phases for Database {:?}", &db); + + match &db { + DB::SystemDB => {} + DB::UserDB(db) => { + url.set_path(db.name.as_str()); + } + } + + let url = Arc::new(url); + let fut = Self::apply_spec_sql_db( + spec.clone(), + url, + ctx.clone(), + jwks_roles.clone(), + concurrency_token.clone(), + db, + ); + + Ok(spawn(fut)) + }) + .collect::>>(); + + for process in db_processes.into_iter() { + let handle = process?; + handle.await??; + } + + for phase in vec![ + HandleOtherExtensions, + HandleNeonExtension, + CreateAvailabilityCheck, + DropRoles, + ] { + debug!("Applying phase {:?}", &phase); + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + phase, + || async { Ok(&client) }, + ) + .await?; + } + + Ok::<(), anyhow::Error>(()) + })?; + + Ok(()) + } + + /// Apply SQL migrations of the RunInEachDatabase phase. + /// + /// May opt to not connect to databases that don't have any scheduled + /// operations. The function is concurrency-controlled with the provided + /// semaphore. The caller has to make sure the semaphore isn't exhausted. + async fn apply_spec_sql_db( + spec: Arc, + url: Arc, + ctx: Arc>, + jwks_roles: Arc>, + concurrency_token: Arc, + db: DB, + ) -> Result<()> { + let _permit = concurrency_token.acquire().await?; + + let mut client_conn = None; + + for subphase in [ + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, + ] { + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + RunInEachDatabase { + db: db.clone(), + subphase, + }, + // Only connect if apply_operation actually wants a connection. + // It's quite possible this database doesn't need any queries, + // so by not connecting we save time and effort connecting to + // that database. + || async { + if client_conn.is_none() { + let db_client = Self::get_maintenance_client(&url).await?; + client_conn.replace(db_client); + } + let client = client_conn.as_ref().unwrap(); + Ok(client) + }, + ) + .await?; + } + + drop(client_conn); + + Ok::<(), anyhow::Error>(()) + } + + /// Do initial configuration of the already started Postgres. + #[instrument(skip_all)] + pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { + // If connection fails, + // it may be the old node with `zenith_admin` superuser. + // + // In this case we need to connect with old `zenith_admin` name + // and create new user. We cannot simply rename connected user, + // but we can create a new one and grant it all privileges. + let mut url = self.connstr.clone(); + url.query_pairs_mut() + .append_pair("application_name", "apply_config"); + + let url = Arc::new(url); + let spec = Arc::new( + compute_state + .pspec + .as_ref() + .expect("spec must be set") + .spec + .clone(), + ); + + // Choose how many concurrent connections to use for applying the spec changes. + // If the cluster is not currently Running we don't have to deal with user connections, + // and can thus use all `max_connections` connection slots. However, that's generally not + // very efficient, so we generally still limit it to a smaller number. + let max_concurrent_connections = if compute_state.status != ComputeStatus::Running { + // If the settings contain 'max_connections', use that as template + if let Some(config) = spec.cluster.settings.find("max_connections") { + config.parse::().ok() + } else { + // Otherwise, try to find the setting in the postgresql_conf string + spec.cluster + .postgresql_conf + .iter() + .flat_map(|conf| conf.split("\n")) + .filter_map(|line| { + if !line.contains("max_connections") { + return None; + } + + let (key, value) = line.split_once("=")?; + let key = key + .trim_start_matches(char::is_whitespace) + .trim_end_matches(char::is_whitespace); + + let value = value + .trim_start_matches(char::is_whitespace) + .trim_end_matches(char::is_whitespace); + + if key != "max_connections" { + return None; + } + + value.parse::().ok() + }) + .next() + } + // If max_connections is present, use at most 1/3rd of that. + // When max_connections is lower than 30, try to use at least 10 connections, but + // never more than max_connections. + .map(|limit| match limit { + 0..10 => limit, + 10..30 => 10, + 30.. => limit / 3, + }) + // If we didn't find max_connections, default to 10 concurrent connections. + .unwrap_or(10) + } else { + // state == Running + // Because the cluster is already in the Running state, we should assume users are + // already connected to the cluster, and high concurrency could negatively + // impact user connectivity. Therefore, we can limit concurrency to the number of + // reserved superuser connections, which users wouldn't be able to use anyway. + spec.cluster + .settings + .find("superuser_reserved_connections") + .iter() + .filter_map(|val| val.parse::().ok()) + .map(|val| if val > 1 { val - 1 } else { 1 }) + .last() + .unwrap_or(3) + }; + + // Merge-apply spec & changes to PostgreSQL state. + self.apply_spec_sql(spec.clone(), url.clone(), max_concurrent_connections)?; + + if let Some(ref local_proxy) = &spec.clone().local_proxy_config { info!("configuring local_proxy"); local_proxy::configure(local_proxy).context("apply_config local_proxy")?; } // Run migrations separately to not hold up cold starts thread::spawn(move || { - let mut connstr = connstr.clone(); + let mut connstr = url.as_ref().clone(); connstr .query_pairs_mut() .append_pair("application_name", "migrations"); @@ -908,7 +1164,8 @@ impl ComputeNode { let mut client = Client::connect(connstr.as_str(), NoTls)?; handle_migrations(&mut client).context("apply_config handle_migrations") }); - Ok(()) + + Ok::<(), anyhow::Error>(()) } // Wrapped this around `pg_ctl reload`, but right now we don't use @@ -971,32 +1228,16 @@ impl ComputeNode { config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || { self.pg_reload_conf()?; - let mut client = Client::connect(self.connstr.as_str(), NoTls)?; - - // Proceed with post-startup configuration. Note, that order of operations is important. - // Disable DDL forwarding because control plane already knows about these roles/databases. if spec.mode == ComputeMode::Primary { - client.simple_query("SET neon.forward_ddl = false")?; - cleanup_instance(&mut client)?; - handle_roles(&spec, &mut client)?; - handle_databases(&spec, &mut client)?; - handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?; - handle_grants( - &spec, - &mut client, - self.connstr.as_str(), - self.has_feature(ComputeFeature::AnonExtension), - )?; - handle_extensions(&spec, &mut client)?; - handle_extension_neon(&mut client)?; - // We can skip handle_migrations here because a new migration can only appear - // if we have a new version of the compute_ctl binary, which can only happen - // if compute got restarted, in which case we'll end up inside of apply_config - // instead of reconfigure. - } + let mut url = self.connstr.clone(); + url.query_pairs_mut() + .append_pair("application_name", "apply_config"); + let url = Arc::new(url); - // 'Close' connection - drop(client); + let spec = Arc::new(spec.clone()); + + self.apply_spec_sql(spec, url, 1)?; + } Ok(()) })?; diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index d27ae58fa2..ee4cf2dfa5 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -23,5 +23,6 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod spec; +mod spec_apply; pub mod swap; pub mod sync_sk; diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index b2dc265864..4a1e5ee0e8 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -10,9 +10,9 @@ use std::thread::JoinHandle; use std::time::{Duration, Instant}; use anyhow::{bail, Result}; +use futures::StreamExt; use ini::Ini; use notify::{RecursiveMode, Watcher}; -use postgres::{Client, Transaction}; use tokio::io::AsyncBufReadExt; use tokio::time::timeout; use tokio_postgres::NoTls; @@ -197,27 +197,34 @@ impl Escaping for PgIdent { } /// Build a list of existing Postgres roles -pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result> { - let postgres_roles = xact - .query("SELECT rolname, rolpassword FROM pg_catalog.pg_authid", &[])? - .iter() +pub async fn get_existing_roles_async(client: &tokio_postgres::Client) -> Result> { + let postgres_roles = client + .query_raw::( + "SELECT rolname, rolpassword FROM pg_catalog.pg_authid", + &[], + ) + .await? + .filter_map(|row| async { row.ok() }) .map(|row| Role { name: row.get("rolname"), encrypted_password: row.get("rolpassword"), options: None, }) - .collect(); + .collect() + .await; Ok(postgres_roles) } /// Build a list of existing Postgres databases -pub fn get_existing_dbs(client: &mut Client) -> Result> { +pub async fn get_existing_dbs_async( + client: &tokio_postgres::Client, +) -> Result> { // `pg_database.datconnlimit = -2` means that the database is in the // invalid state. See: // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 - let postgres_dbs: Vec = client - .query( + let rowstream = client + .query_raw::( "SELECT datname AS name, datdba::regrole::text AS owner, @@ -226,8 +233,11 @@ pub fn get_existing_dbs(client: &mut Client) -> Result FROM pg_catalog.pg_database;", &[], - )? - .iter() + ) + .await?; + + let dbs_map = rowstream + .filter_map(|r| async { r.ok() }) .map(|row| Database { name: row.get("name"), owner: row.get("owner"), @@ -235,12 +245,9 @@ pub fn get_existing_dbs(client: &mut Client) -> Result invalid: row.get("invalid"), options: None, }) - .collect(); - - let dbs_map = postgres_dbs - .iter() .map(|db| (db.name.clone(), db.clone())) - .collect::>(); + .collect::>() + .await; Ok(dbs_map) } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 73f3d1006a..c7d2deb090 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,22 +1,17 @@ -use std::collections::HashSet; +use anyhow::{anyhow, bail, Result}; +use postgres::Client; +use reqwest::StatusCode; use std::fs::File; use std::path::Path; -use std::str::FromStr; - -use anyhow::{anyhow, bail, Context, Result}; -use postgres::config::Config; -use postgres::{Client, NoTls}; -use reqwest::StatusCode; -use tracing::{error, info, info_span, instrument, span_enabled, warn, Level}; +use tracing::{error, info, instrument, warn}; use crate::config; -use crate::logger::inlinify; use crate::migration::MigrationRunner; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse}; -use compute_api::spec::{ComputeSpec, PgIdent, Role}; +use compute_api::spec::ComputeSpec; // Do control plane request and return response if any. In case of error it // returns a bool flag indicating whether it makes sense to retry the request @@ -151,625 +146,6 @@ pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> { Ok(()) } -/// Compute could be unexpectedly shut down, for example, during the -/// database dropping. This leaves the database in the invalid state, -/// which prevents new db creation with the same name. This function -/// will clean it up before proceeding with catalog updates. All -/// possible future cleanup operations may go here too. -#[instrument(skip_all)] -pub fn cleanup_instance(client: &mut Client) -> Result<()> { - let existing_dbs = get_existing_dbs(client)?; - - for (_, db) in existing_dbs { - if db.invalid { - // After recent commit in Postgres, interrupted DROP DATABASE - // leaves the database in the invalid state. According to the - // commit message, the only option for user is to drop it again. - // See: - // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 - // - // Postgres Neon extension is done the way, that db is de-registered - // in the control plane metadata only after it is dropped. So there is - // a chance that it still thinks that db should exist. This means - // that it will be re-created by `handle_databases()`. Yet, it's fine - // as user can just repeat drop (in vanilla Postgres they would need - // to do the same, btw). - let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()); - info!("dropping invalid database {}", db.name); - client.execute(query.as_str(), &[])?; - } - } - - Ok(()) -} - -/// Given a cluster spec json and open transaction it handles roles creation, -/// deletion and update. -#[instrument(skip_all)] -pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - let mut xact = client.transaction()?; - let existing_roles: Vec = get_existing_roles(&mut xact)?; - - let mut jwks_roles = HashSet::new(); - if let Some(local_proxy) = &spec.local_proxy_config { - for jwks_setting in local_proxy.jwks.iter().flatten() { - for role_name in &jwks_setting.role_names { - jwks_roles.insert(role_name.clone()); - } - } - } - - // Print a list of existing Postgres roles (only in debug mode) - if span_enabled!(Level::INFO) { - let mut vec = Vec::new(); - for r in &existing_roles { - vec.push(format!( - "{}:{}", - r.name, - if r.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - } - )); - } - - info!("postgres roles (total {}): {:?}", vec.len(), vec); - } - - // Process delta operations first - if let Some(ops) = &spec.delta_operations { - info!("processing role renames"); - for op in ops { - match op.action.as_ref() { - "delete_role" => { - // no-op now, roles will be deleted at the end of configuration - } - // Renaming role drops its password, since role name is - // used as a salt there. It is important that this role - // is recorded with a new `name` in the `roles` list. - // Follow up roles update will set the new password. - "rename_role" => { - let new_name = op.new_name.as_ref().unwrap(); - - // XXX: with a limited number of roles it is fine, but consider making it a HashMap - if existing_roles.iter().any(|r| r.name == op.name) { - let query: String = format!( - "ALTER ROLE {} RENAME TO {}", - op.name.pg_quote(), - new_name.pg_quote() - ); - - warn!("renaming role '{}' to '{}'", op.name, new_name); - xact.execute(query.as_str(), &[])?; - } - } - _ => {} - } - } - } - - // Refresh Postgres roles info to handle possible roles renaming - let existing_roles: Vec = get_existing_roles(&mut xact)?; - - info!( - "handling cluster spec roles (total {})", - spec.cluster.roles.len() - ); - for role in &spec.cluster.roles { - let name = &role.name; - // XXX: with a limited number of roles it is fine, but consider making it a HashMap - let pg_role = existing_roles.iter().find(|r| r.name == *name); - - enum RoleAction { - None, - Update, - Create, - } - let action = if let Some(r) = pg_role { - if (r.encrypted_password.is_none() && role.encrypted_password.is_some()) - || (r.encrypted_password.is_some() && role.encrypted_password.is_none()) - { - RoleAction::Update - } else if let Some(pg_pwd) = &r.encrypted_password { - // Check whether password changed or not (trim 'md5' prefix first if any) - // - // This is a backward compatibility hack, which comes from the times when we were using - // md5 for everyone and hashes were stored in the console db without md5 prefix. So when - // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix, - // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix. - // Here is the only place so far where we compare hashes, so it seems to be the best candidate - // to place this compatibility layer. - let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") { - stripped - } else { - pg_pwd - }; - if pg_pwd != *role.encrypted_password.as_ref().unwrap() { - RoleAction::Update - } else { - RoleAction::None - } - } else { - RoleAction::None - } - } else { - RoleAction::Create - }; - - match action { - RoleAction::None => {} - RoleAction::Update => { - // This can be run on /every/ role! Not just ones created through the console. - // This means that if you add some funny ALTER here that adds a permission, - // this will get run even on user-created roles! This will result in different - // behavior before and after a spec gets reapplied. The below ALTER as it stands - // now only grants LOGIN and changes the password. Please do not allow this branch - // to do anything silly. - let mut query: String = format!("ALTER ROLE {} ", name.pg_quote()); - query.push_str(&role.to_pg_options()); - xact.execute(query.as_str(), &[])?; - } - RoleAction::Create => { - // This branch only runs when roles are created through the console, so it is - // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited - // from neon_superuser. - let mut query: String = format!( - "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser", - name.pg_quote() - ); - if jwks_roles.contains(name.as_str()) { - query = format!("CREATE ROLE {}", name.pg_quote()); - } - info!("running role create query: '{}'", &query); - query.push_str(&role.to_pg_options()); - xact.execute(query.as_str(), &[])?; - } - } - - if span_enabled!(Level::INFO) { - let pwd = if role.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - }; - let action_str = match action { - RoleAction::None => "", - RoleAction::Create => " -> create", - RoleAction::Update => " -> update", - }; - info!(" - {}:{}{}", name, pwd, action_str); - } - } - - xact.commit()?; - - Ok(()) -} - -/// Reassign all dependent objects and delete requested roles. -#[instrument(skip_all)] -pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { - if let Some(ops) = &spec.delta_operations { - // First, reassign all dependent objects to db owners. - info!("reassigning dependent objects of to-be-deleted roles"); - - // Fetch existing roles. We could've exported and used `existing_roles` from - // `handle_roles()`, but we only make this list there before creating new roles. - // Which is probably fine as we never create to-be-deleted roles, but that'd - // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared - // buffers already, so this shouldn't be a big deal. - let mut xact = client.transaction()?; - let existing_roles: Vec = get_existing_roles(&mut xact)?; - xact.commit()?; - - for op in ops { - // Check that role is still present in Postgres, as this could be a - // restart with the same spec after role deletion. - if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) { - reassign_owned_objects(spec, connstr, &op.name)?; - } - } - - // Second, proceed with role deletions. - info!("processing role deletions"); - let mut xact = client.transaction()?; - for op in ops { - // We do not check either role exists or not, - // Postgres will take care of it for us - if op.action == "delete_role" { - let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote()); - - warn!("deleting role '{}'", &op.name); - xact.execute(query.as_str(), &[])?; - } - } - xact.commit()?; - } - - Ok(()) -} - -fn reassign_owned_objects_in_one_db( - conf: Config, - role_name: &PgIdent, - db_owner: &PgIdent, -) -> Result<()> { - let mut client = conf.connect(NoTls)?; - - // This will reassign all dependent objects to the db owner - let reassign_query = format!( - "REASSIGN OWNED BY {} TO {}", - role_name.pg_quote(), - db_owner.pg_quote() - ); - info!( - "reassigning objects owned by '{}' in db '{}' to '{}'", - role_name, - conf.get_dbname().unwrap_or(""), - db_owner - ); - client.simple_query(&reassign_query)?; - - // This now will only drop privileges of the role - let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote()); - client.simple_query(&drop_query)?; - Ok(()) -} - -// Reassign all owned objects in all databases to the owner of the database. -fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> { - for db in &spec.cluster.databases { - if db.owner != *role_name { - let mut conf = Config::from_str(connstr)?; - conf.dbname(&db.name); - reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?; - } - } - - // Also handle case when there are no databases in the spec. - // In this case we need to reassign objects in the default database. - let conf = Config::from_str(connstr)?; - let db_owner = PgIdent::from_str("cloud_admin")?; - reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?; - - Ok(()) -} - -/// It follows mostly the same logic as `handle_roles()` excepting that we -/// does not use an explicit transactions block, since major database operations -/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level -/// atomicity should be enough here due to the order of operations and various checks, -/// which together provide us idempotency. -#[instrument(skip_all)] -pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - let existing_dbs = get_existing_dbs(client)?; - - // Print a list of existing Postgres databases (only in debug mode) - if span_enabled!(Level::INFO) { - let mut vec = Vec::new(); - for (dbname, db) in &existing_dbs { - vec.push(format!("{}:{}", dbname, db.owner)); - } - info!("postgres databases (total {}): {:?}", vec.len(), vec); - } - - // Process delta operations first - if let Some(ops) = &spec.delta_operations { - info!("processing delta operations on databases"); - for op in ops { - match op.action.as_ref() { - // We do not check either DB exists or not, - // Postgres will take care of it for us - "delete_db" => { - // In Postgres we can't drop a database if it is a template. - // So we need to unset the template flag first, but it could - // be a retry, so we could've already dropped the database. - // Check that database exists first to make it idempotent. - let unset_template_query: String = format!( - " - DO $$ - BEGIN - IF EXISTS( - SELECT 1 - FROM pg_catalog.pg_database - WHERE datname = {} - ) - THEN - ALTER DATABASE {} is_template false; - END IF; - END - $$;", - escape_literal(&op.name), - &op.name.pg_quote() - ); - // Use FORCE to drop database even if there are active connections. - // We run this from `cloud_admin`, so it should have enough privileges. - // NB: there could be other db states, which prevent us from dropping - // the database. For example, if db is used by any active subscription - // or replication slot. - // TODO: deal with it once we allow logical replication. Proper fix should - // involve returning an error code to the control plane, so it could - // figure out that this is a non-retryable error, return it to the user - // and fail operation permanently. - let drop_db_query: String = format!( - "DROP DATABASE IF EXISTS {} WITH (FORCE)", - &op.name.pg_quote() - ); - - warn!("deleting database '{}'", &op.name); - client.execute(unset_template_query.as_str(), &[])?; - client.execute(drop_db_query.as_str(), &[])?; - } - "rename_db" => { - let new_name = op.new_name.as_ref().unwrap(); - - if existing_dbs.contains_key(&op.name) { - let query: String = format!( - "ALTER DATABASE {} RENAME TO {}", - op.name.pg_quote(), - new_name.pg_quote() - ); - - warn!("renaming database '{}' to '{}'", op.name, new_name); - client.execute(query.as_str(), &[])?; - } - } - _ => {} - } - } - } - - // Refresh Postgres databases info to handle possible renames - let existing_dbs = get_existing_dbs(client)?; - - info!( - "handling cluster spec databases (total {})", - spec.cluster.databases.len() - ); - for db in &spec.cluster.databases { - let name = &db.name; - let pg_db = existing_dbs.get(name); - - enum DatabaseAction { - None, - Update, - Create, - } - let action = if let Some(r) = pg_db { - // XXX: db owner name is returned as quoted string from Postgres, - // when quoting is needed. - let new_owner = if r.owner.starts_with('"') { - db.owner.pg_quote() - } else { - db.owner.clone() - }; - - if new_owner != r.owner { - // Update the owner - DatabaseAction::Update - } else { - DatabaseAction::None - } - } else { - DatabaseAction::Create - }; - - match action { - DatabaseAction::None => {} - DatabaseAction::Update => { - let query: String = format!( - "ALTER DATABASE {} OWNER TO {}", - name.pg_quote(), - db.owner.pg_quote() - ); - let _guard = info_span!("executing", query).entered(); - client.execute(query.as_str(), &[])?; - } - DatabaseAction::Create => { - let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote()); - query.push_str(&db.to_pg_options()); - let _guard = info_span!("executing", query).entered(); - client.execute(query.as_str(), &[])?; - let grant_query: String = format!( - "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser", - name.pg_quote() - ); - client.execute(grant_query.as_str(), &[])?; - } - }; - - if span_enabled!(Level::INFO) { - let action_str = match action { - DatabaseAction::None => "", - DatabaseAction::Create => " -> create", - DatabaseAction::Update => " -> update", - }; - info!(" - {}:{}{}", db.name, db.owner, action_str); - } - } - - Ok(()) -} - -/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants -/// to allow users creating trusted extensions and re-creating `public` schema, for example. -#[instrument(skip_all)] -pub fn handle_grants( - spec: &ComputeSpec, - client: &mut Client, - connstr: &str, - enable_anon_extension: bool, -) -> Result<()> { - info!("modifying database permissions"); - let existing_dbs = get_existing_dbs(client)?; - - // Do some per-database access adjustments. We'd better do this at db creation time, - // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants - // atomically. - for db in &spec.cluster.databases { - match existing_dbs.get(&db.name) { - Some(pg_db) => { - if pg_db.restrict_conn || pg_db.invalid { - info!( - "skipping grants for db {} (invalid: {}, connections not allowed: {})", - db.name, pg_db.invalid, pg_db.restrict_conn - ); - continue; - } - } - None => { - bail!( - "database {} doesn't exist in Postgres after handle_databases()", - db.name - ); - } - } - - let mut conf = Config::from_str(connstr)?; - conf.dbname(&db.name); - - let mut db_client = conf.connect(NoTls)?; - - // This will only change ownership on the schema itself, not the objects - // inside it. Without it owner of the `public` schema will be `cloud_admin` - // and database owner cannot do anything with it. SQL procedure ensures - // that it won't error out if schema `public` doesn't exist. - let alter_query = format!( - "DO $$\n\ - DECLARE\n\ - schema_owner TEXT;\n\ - BEGIN\n\ - IF EXISTS(\n\ - SELECT nspname\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - )\n\ - THEN\n\ - SELECT nspowner::regrole::text\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - INTO schema_owner;\n\ - \n\ - IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\ - THEN\n\ - ALTER SCHEMA public OWNER TO {};\n\ - END IF;\n\ - END IF;\n\ - END\n\ - $$;", - db.owner.pg_quote() - ); - db_client.simple_query(&alter_query)?; - - // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. - // This is needed because since postgres 15 this privilege is removed by default. - // TODO: web_access isn't created for almost 1 year. It could be that we have - // active users of 1 year old projects, but hopefully not, so check it and - // remove this code if possible. The worst thing that could happen is that - // user won't be able to use public schema in NEW databases created in the - // very OLD project. - // - // Also, alter default permissions so that relations created by extensions can be - // used by neon_superuser without permission issues. - let grant_query = "DO $$\n\ - BEGIN\n\ - IF EXISTS(\n\ - SELECT nspname\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - ) AND\n\ - current_setting('server_version_num')::int/10000 >= 15\n\ - THEN\n\ - IF EXISTS(\n\ - SELECT rolname\n\ - FROM pg_catalog.pg_roles\n\ - WHERE rolname = 'web_access'\n\ - )\n\ - THEN\n\ - GRANT CREATE ON SCHEMA public TO web_access;\n\ - END IF;\n\ - END IF;\n\ - IF EXISTS(\n\ - SELECT nspname\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - )\n\ - THEN\n\ - ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\ - ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\ - END IF;\n\ - END\n\ - $$;" - .to_string(); - - info!( - "grant query for db {} : {}", - &db.name, - inlinify(&grant_query) - ); - db_client.simple_query(&grant_query)?; - - // it is important to run this after all grants - if enable_anon_extension { - handle_extension_anon(spec, &db.owner, &mut db_client, false) - .context("handle_grants handle_extension_anon")?; - } - } - - Ok(()) -} - -/// Create required system extensions -#[instrument(skip_all)] -pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { - if libs.contains("pg_stat_statements") { - // Create extension only if this compute really needs it - let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements"; - info!("creating system extensions with query: {}", query); - client.simple_query(query)?; - } - } - - Ok(()) -} - -/// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database -#[instrument(skip_all)] -pub fn handle_extension_neon(client: &mut Client) -> Result<()> { - info!("handle extension neon"); - - let mut query = "CREATE SCHEMA IF NOT EXISTS neon"; - client.simple_query(query)?; - - query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"; - info!("create neon extension with query: {}", query); - client.simple_query(query)?; - - query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'"; - client.simple_query(query)?; - - query = "ALTER EXTENSION neon SET SCHEMA neon"; - info!("alter neon extension schema with query: {}", query); - client.simple_query(query)?; - - // this will be a no-op if extension is already up to date, - // which may happen in two cases: - // - extension was just installed - // - extension was already installed and is up to date - let query = "ALTER EXTENSION neon UPDATE"; - info!("update neon extension version with query: {}", query); - if let Err(e) = client.simple_query(query) { - error!( - "failed to upgrade neon extension during `handle_extension_neon`: {}", - e - ); - } - - Ok(()) -} - #[instrument(skip_all)] pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> { info!("handle neon extension upgrade"); diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs new file mode 100644 index 0000000000..7308d5d36e --- /dev/null +++ b/compute_tools/src/spec_apply.rs @@ -0,0 +1,680 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::iter::empty; +use std::iter::once; +use std::sync::Arc; + +use crate::compute::construct_superuser_query; +use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt}; +use anyhow::{bail, Result}; +use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role}; +use futures::future::join_all; +use tokio::sync::RwLock; +use tokio_postgres::Client; +use tracing::{debug, info_span, Instrument}; + +#[derive(Clone)] +pub enum DB { + SystemDB, + UserDB(Database), +} + +impl DB { + pub fn new(db: Database) -> DB { + Self::UserDB(db) + } + + pub fn is_owned_by(&self, role: &PgIdent) -> bool { + match self { + DB::SystemDB => false, + DB::UserDB(db) => &db.owner == role, + } + } +} + +impl Debug for DB { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DB::SystemDB => f.debug_tuple("SystemDB").finish(), + DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(), + } + } +} + +#[derive(Copy, Clone, Debug)] +pub enum PerDatabasePhase { + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, +} + +#[derive(Clone, Debug)] +pub enum ApplySpecPhase { + CreateSuperUser, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + RunInEachDatabase { db: DB, subphase: PerDatabasePhase }, + HandleOtherExtensions, + HandleNeonExtension, + CreateAvailabilityCheck, + DropRoles, +} + +pub struct Operation { + pub query: String, + pub comment: Option, +} + +pub struct MutableApplyContext { + pub roles: HashMap, + pub dbs: HashMap, +} + +/// Appply the operations that belong to the given spec apply phase. +/// +/// Commands within a single phase are executed in order of Iterator yield. +/// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database +/// indicated by its `db` field, and can share a single client for all changes +/// to that database. +/// +/// Notes: +/// - Commands are pipelined, and thus may cause incomplete apply if one +/// command of many fails. +/// - Failing commands will fail the phase's apply step once the return value +/// is processed. +/// - No timeouts have (yet) been implemented. +/// - The caller is responsible for limiting and/or applying concurrency. +pub async fn apply_operations<'a, Fut, F>( + spec: Arc, + ctx: Arc>, + jwks_roles: Arc>, + apply_spec_phase: ApplySpecPhase, + client: F, +) -> Result<()> +where + F: FnOnce() -> Fut, + Fut: Future>, +{ + debug!("Starting phase {:?}", &apply_spec_phase); + let span = info_span!("db_apply_changes", phase=?apply_spec_phase); + let span2 = span.clone(); + async move { + debug!("Processing phase {:?}", &apply_spec_phase); + let ctx = ctx; + + let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase) + .await? + .peekable(); + + // Return (and by doing so, skip requesting the PostgreSQL client) if + // we don't have any operations scheduled. + if ops.peek().is_none() { + return Ok(()); + } + + let client = client().await?; + + debug!("Applying phase {:?}", &apply_spec_phase); + + let active_queries = ops + .map(|op| { + let Operation { comment, query } = op; + let inspan = match comment { + None => span.clone(), + Some(comment) => info_span!("phase {}: {}", comment), + }; + + async { + let query = query; + let res = client.simple_query(&query).await; + debug!( + "{} {}", + if res.is_ok() { + "successfully executed" + } else { + "failed to execute" + }, + query + ); + res + } + .instrument(inspan) + }) + .collect::>(); + + drop(ctx); + + for it in join_all(active_queries).await { + drop(it?); + } + + debug!("Completed phase {:?}", &apply_spec_phase); + + Ok(()) + } + .instrument(span2) + .await +} + +/// Create a stream of operations to be executed for that phase of applying +/// changes. +/// +/// In the future we may generate a single stream of changes and then +/// sort/merge/batch execution, but for now this is a nice way to improve +/// batching behaviour of the commands. +async fn get_operations<'a>( + spec: &'a ComputeSpec, + ctx: &'a RwLock, + jwks_roles: &'a HashSet, + apply_spec_phase: &'a ApplySpecPhase, +) -> Result + 'a + Send>> { + match apply_spec_phase { + ApplySpecPhase::CreateSuperUser => { + let query = construct_superuser_query(spec); + + Ok(Box::new(once(Operation { + query, + comment: None, + }))) + } + ApplySpecPhase::DropInvalidDatabases => { + let mut ctx = ctx.write().await; + let databases = &mut ctx.dbs; + + let keys: Vec<_> = databases + .iter() + .filter(|(_, db)| db.invalid) + .map(|(dbname, _)| dbname.clone()) + .collect(); + + // After recent commit in Postgres, interrupted DROP DATABASE + // leaves the database in the invalid state. According to the + // commit message, the only option for user is to drop it again. + // See: + // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 + // + // Postgres Neon extension is done the way, that db is de-registered + // in the control plane metadata only after it is dropped. So there is + // a chance that it still thinks that the db should exist. This means + // that it will be re-created by the `CreateDatabases` phase. This + // is fine, as user can just drop the table again (in vanilla + // Postgres they would need to do the same). + let operations = keys + .into_iter() + .filter_map(move |dbname| ctx.dbs.remove(&dbname)) + .map(|db| Operation { + query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()), + comment: Some(format!("Dropping invalid database {}", db.name)), + }); + + Ok(Box::new(operations)) + } + ApplySpecPhase::RenameRoles => { + let mut ctx = ctx.write().await; + + let operations = spec + .delta_operations + .iter() + .flatten() + .filter(|op| op.action == "rename_role") + .filter_map(move |op| { + let roles = &mut ctx.roles; + + if roles.contains_key(op.name.as_str()) { + None + } else { + let new_name = op.new_name.as_ref().unwrap(); + let mut role = roles.remove(op.name.as_str()).unwrap(); + + role.name = new_name.clone(); + role.encrypted_password = None; + roles.insert(role.name.clone(), role); + + Some(Operation { + query: format!( + "ALTER ROLE {} RENAME TO {}", + op.name.pg_quote(), + new_name.pg_quote() + ), + comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)), + }) + } + }); + + Ok(Box::new(operations)) + } + ApplySpecPhase::CreateAndAlterRoles => { + let mut ctx = ctx.write().await; + + let operations = spec.cluster.roles + .iter() + .filter_map(move |role| { + let roles = &mut ctx.roles; + let db_role = roles.get(&role.name); + + match db_role { + Some(db_role) => { + if db_role.encrypted_password != role.encrypted_password { + // This can be run on /every/ role! Not just ones created through the console. + // This means that if you add some funny ALTER here that adds a permission, + // this will get run even on user-created roles! This will result in different + // behavior before and after a spec gets reapplied. The below ALTER as it stands + // now only grants LOGIN and changes the password. Please do not allow this branch + // to do anything silly. + Some(Operation { + query: format!( + "ALTER ROLE {} {}", + role.name.pg_quote(), + role.to_pg_options(), + ), + comment: None, + }) + } else { + None + } + } + None => { + let query = if !jwks_roles.contains(role.name.as_str()) { + format!( + "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}", + role.name.pg_quote(), + role.to_pg_options(), + ) + } else { + format!( + "CREATE ROLE {} {}", + role.name.pg_quote(), + role.to_pg_options(), + ) + }; + Some(Operation { + query, + comment: Some(format!("creating role {}", role.name)), + }) + } + } + }); + + Ok(Box::new(operations)) + } + ApplySpecPhase::RenameAndDeleteDatabases => { + let mut ctx = ctx.write().await; + + let operations = spec + .delta_operations + .iter() + .flatten() + .filter_map(move |op| { + let databases = &mut ctx.dbs; + match op.action.as_str() { + // We do not check whether the DB exists or not, + // Postgres will take care of it for us + "delete_db" => { + // In Postgres we can't drop a database if it is a template. + // So we need to unset the template flag first, but it could + // be a retry, so we could've already dropped the database. + // Check that database exists first to make it idempotent. + let unset_template_query: String = format!( + include_str!("sql/unset_template_for_drop_dbs.sql"), + datname_str = escape_literal(&op.name), + datname = &op.name.pg_quote() + ); + + // Use FORCE to drop database even if there are active connections. + // We run this from `cloud_admin`, so it should have enough privileges. + // NB: there could be other db states, which prevent us from dropping + // the database. For example, if db is used by any active subscription + // or replication slot. + // TODO: deal with it once we allow logical replication. Proper fix should + // involve returning an error code to the control plane, so it could + // figure out that this is a non-retryable error, return it to the user + // and fail operation permanently. + let drop_db_query: String = format!( + "DROP DATABASE IF EXISTS {} WITH (FORCE)", + &op.name.pg_quote() + ); + + databases.remove(&op.name); + + Some(vec![ + Operation { + query: unset_template_query, + comment: Some(format!( + "optionally clearing template flags for DB {}", + op.name, + )), + }, + Operation { + query: drop_db_query, + comment: Some(format!("deleting database {}", op.name,)), + }, + ]) + } + "rename_db" => { + if let Some(mut db) = databases.remove(&op.name) { + // update state of known databases + let new_name = op.new_name.as_ref().unwrap(); + db.name = new_name.clone(); + databases.insert(db.name.clone(), db); + + Some(vec![Operation { + query: format!( + "ALTER DATABASE {} RENAME TO {}", + op.name.pg_quote(), + new_name.pg_quote(), + ), + comment: Some(format!( + "renaming database '{}' to '{}'", + op.name, new_name + )), + }]) + } else { + None + } + } + _ => None, + } + }) + .flatten(); + + Ok(Box::new(operations)) + } + ApplySpecPhase::CreateAndAlterDatabases => { + let mut ctx = ctx.write().await; + + let operations = spec + .cluster + .databases + .iter() + .filter_map(move |db| { + let databases = &mut ctx.dbs; + if let Some(edb) = databases.get_mut(&db.name) { + let change_owner = if edb.owner.starts_with('"') { + db.owner.pg_quote() != edb.owner + } else { + db.owner != edb.owner + }; + + edb.owner = db.owner.clone(); + + if change_owner { + Some(vec![Operation { + query: format!( + "ALTER DATABASE {} OWNER TO {}", + db.name.pg_quote(), + db.owner.pg_quote() + ), + comment: Some(format!( + "changing database owner of database {} to {}", + db.name, db.owner + )), + }]) + } else { + None + } + } else { + databases.insert(db.name.clone(), db.clone()); + + Some(vec![ + Operation { + query: format!( + "CREATE DATABASE {} {}", + db.name.pg_quote(), + db.to_pg_options(), + ), + comment: None, + }, + Operation { + query: format!( + "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser", + db.name.pg_quote() + ), + comment: None, + }, + ]) + } + }) + .flatten(); + + Ok(Box::new(operations)) + } + ApplySpecPhase::RunInEachDatabase { db, subphase } => { + match subphase { + PerDatabasePhase::DeleteDBRoleReferences => { + let ctx = ctx.read().await; + + let operations = + spec.delta_operations + .iter() + .flatten() + .filter(|op| op.action == "delete_role") + .filter_map(move |op| { + if db.is_owned_by(&op.name) { + return None; + } + if !ctx.roles.contains_key(&op.name) { + return None; + } + let quoted = op.name.pg_quote(); + let new_owner = match &db { + DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(), + DB::UserDB(db) => db.owner.pg_quote(), + }; + + Some(vec![ + // This will reassign all dependent objects to the db owner + Operation { + query: format!( + "REASSIGN OWNED BY {} TO {}", + quoted, new_owner, + ), + comment: None, + }, + // This now will only drop privileges of the role + Operation { + query: format!("DROP OWNED BY {}", quoted), + comment: None, + }, + ]) + }) + .flatten(); + + Ok(Box::new(operations)) + } + PerDatabasePhase::ChangeSchemaPerms => { + let ctx = ctx.read().await; + let databases = &ctx.dbs; + + let db = match &db { + // ignore schema permissions on the system database + DB::SystemDB => return Ok(Box::new(empty())), + DB::UserDB(db) => db, + }; + + if databases.get(&db.name).is_none() { + bail!("database {} doesn't exist in PostgreSQL", db.name); + } + + let edb = databases.get(&db.name).unwrap(); + + if edb.restrict_conn || edb.invalid { + return Ok(Box::new(empty())); + } + + let operations = vec![ + Operation { + query: format!( + include_str!("sql/set_public_schema_owner.sql"), + db_owner = db.owner.pg_quote() + ), + comment: None, + }, + Operation { + query: String::from(include_str!("sql/default_grants.sql")), + comment: None, + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + PerDatabasePhase::HandleAnonExtension => { + // Only install Anon into user databases + let db = match &db { + DB::SystemDB => return Ok(Box::new(empty())), + DB::UserDB(db) => db, + }; + // Never install Anon when it's not enabled as feature + if !spec.features.contains(&ComputeFeature::AnonExtension) { + return Ok(Box::new(empty())); + } + + // Only install Anon when it's added in preload libraries + let opt_libs = spec.cluster.settings.find("shared_preload_libraries"); + + let libs = match opt_libs { + Some(libs) => libs, + None => return Ok(Box::new(empty())), + }; + + if !libs.contains("anon") { + return Ok(Box::new(empty())); + } + + let db_owner = db.owner.pg_quote(); + + let operations = vec![ + // Create anon extension if this compute needs it + // Users cannot create it themselves, because superuser is required. + Operation { + query: String::from("CREATE EXTENSION IF NOT EXISTS anon CASCADE"), + comment: Some(String::from("creating anon extension")), + }, + // Initialize anon extension + // This also requires superuser privileges, so users cannot do it themselves. + Operation { + query: String::from("SELECT anon.init()"), + comment: Some(String::from("initializing anon extension data")), + }, + Operation { + query: format!("GRANT ALL ON SCHEMA anon TO {}", db_owner), + comment: Some(String::from( + "granting anon extension schema permissions", + )), + }, + Operation { + query: format!( + "GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", + db_owner + ), + comment: Some(String::from( + "granting anon extension schema functions permissions", + )), + }, + // We need this, because some functions are defined as SECURITY DEFINER. + // In Postgres SECURITY DEFINER functions are executed with the privileges + // of the owner. + // In anon extension this it is needed to access some GUCs, which are only accessible to + // superuser. But we've patched postgres to allow db_owner to access them as well. + // So we need to change owner of these functions to db_owner. + Operation { + query: format!( + include_str!("sql/anon_ext_fn_reassign.sql"), + db_owner = db_owner, + ), + comment: Some(String::from( + "change anon extension functions owner to database_owner", + )), + }, + Operation { + query: format!( + "GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", + db_owner, + ), + comment: Some(String::from( + "granting anon extension tables permissions", + )), + }, + Operation { + query: format!( + "GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", + db_owner, + ), + comment: Some(String::from( + "granting anon extension sequences permissions", + )), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + } + } + // Interestingly, we only install p_s_s in the main database, even when + // it's preloaded. + ApplySpecPhase::HandleOtherExtensions => { + if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { + if libs.contains("pg_stat_statements") { + return Ok(Box::new(once(Operation { + query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"), + comment: Some(String::from("create system extensions")), + }))); + } + } + Ok(Box::new(empty())) + } + ApplySpecPhase::HandleNeonExtension => { + let operations = vec![ + Operation { + query: String::from("CREATE SCHEMA IF NOT EXISTS neon"), + comment: Some(String::from("init: add schema for extension")), + }, + Operation { + query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"), + comment: Some(String::from( + "init: install the extension if not already installed", + )), + }, + Operation { + query: String::from( + "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'", + ), + comment: Some(String::from("compat/fix: make neon relocatable")), + }, + Operation { + query: String::from("ALTER EXTENSION neon SET SCHEMA neon"), + comment: Some(String::from("compat/fix: alter neon extension schema")), + }, + Operation { + query: String::from("ALTER EXTENSION neon UPDATE"), + comment: Some(String::from("compat/update: update neon extension version")), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")), + comment: None, + }))), + ApplySpecPhase::DropRoles => { + let operations = spec + .delta_operations + .iter() + .flatten() + .filter(|op| op.action == "delete_role") + .map(|op| Operation { + query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()), + comment: None, + }); + + Ok(Box::new(operations)) + } + } +} diff --git a/compute_tools/src/sql/add_availabilitycheck_tables.sql b/compute_tools/src/sql/add_availabilitycheck_tables.sql new file mode 100644 index 0000000000..7c60690c78 --- /dev/null +++ b/compute_tools/src/sql/add_availabilitycheck_tables.sql @@ -0,0 +1,18 @@ +DO $$ +BEGIN + IF NOT EXISTS( + SELECT 1 + FROM pg_catalog.pg_tables + WHERE tablename = 'health_check' + ) + THEN + CREATE TABLE health_check ( + id serial primary key, + updated_at timestamptz default now() + ); + INSERT INTO health_check VALUES (1, now()) + ON CONFLICT (id) DO UPDATE + SET updated_at = now(); + END IF; +END +$$ \ No newline at end of file diff --git a/compute_tools/src/sql/anon_ext_fn_reassign.sql b/compute_tools/src/sql/anon_ext_fn_reassign.sql new file mode 100644 index 0000000000..3d7b15c590 --- /dev/null +++ b/compute_tools/src/sql/anon_ext_fn_reassign.sql @@ -0,0 +1,12 @@ +DO $$ +DECLARE + query varchar; +BEGIN + FOR query IN SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {db_owner};' + FROM pg_proc p + JOIN pg_namespace nsp ON p.pronamespace = nsp.oid + WHERE nsp.nspname = 'anon' LOOP + EXECUTE query; + END LOOP; +END +$$; diff --git a/compute_tools/src/sql/default_grants.sql b/compute_tools/src/sql/default_grants.sql new file mode 100644 index 0000000000..58ebb0690b --- /dev/null +++ b/compute_tools/src/sql/default_grants.sql @@ -0,0 +1,30 @@ +DO +$$ + BEGIN + IF EXISTS( + SELECT nspname + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + ) AND + current_setting('server_version_num')::int / 10000 >= 15 + THEN + IF EXISTS( + SELECT rolname + FROM pg_catalog.pg_roles + WHERE rolname = 'web_access' + ) + THEN + GRANT CREATE ON SCHEMA public TO web_access; + END IF; + END IF; + IF EXISTS( + SELECT nspname + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + ) + THEN + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION; + END IF; + END +$$; \ No newline at end of file diff --git a/compute_tools/src/sql/set_public_schema_owner.sql b/compute_tools/src/sql/set_public_schema_owner.sql new file mode 100644 index 0000000000..fd061a713e --- /dev/null +++ b/compute_tools/src/sql/set_public_schema_owner.sql @@ -0,0 +1,23 @@ +DO +$$ + DECLARE + schema_owner TEXT; + BEGIN + IF EXISTS( + SELECT nspname + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + ) + THEN + SELECT nspowner::regrole::text + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + INTO schema_owner; + + IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin' + THEN + ALTER SCHEMA public OWNER TO {db_owner}; + END IF; + END IF; + END +$$; \ No newline at end of file diff --git a/compute_tools/src/sql/unset_template_for_drop_dbs.sql b/compute_tools/src/sql/unset_template_for_drop_dbs.sql new file mode 100644 index 0000000000..6c4343a589 --- /dev/null +++ b/compute_tools/src/sql/unset_template_for_drop_dbs.sql @@ -0,0 +1,12 @@ +DO $$ + BEGIN + IF EXISTS( + SELECT 1 + FROM pg_catalog.pg_database + WHERE datname = {datname_str} + ) + THEN + ALTER DATABASE {datname} is_template false; + END IF; + END +$$; \ No newline at end of file