diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 4fefa831e0..2f6f82dd39 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -1,38 +1,40 @@ -use compute_api::{ - responses::CatalogObjects, - spec::{Database, Role}, -}; +use compute_api::responses::CatalogObjects; use futures::Stream; -use postgres::{Client, NoTls}; +use postgres::NoTls; use std::{path::Path, process::Stdio, result::Result, sync::Arc}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, - task, + spawn, }; +use tokio_postgres::connect; use tokio_stream::{self as stream, StreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; use tracing::warn; -use crate::{ - compute::ComputeNode, - pg_helpers::{get_existing_dbs, get_existing_roles}, -}; +use crate::compute::ComputeNode; +use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async}; pub async fn get_dbs_and_roles(compute: &Arc) -> anyhow::Result { let connstr = compute.connstr.clone(); - task::spawn_blocking(move || { - let mut client = Client::connect(connstr.as_str(), NoTls)?; - let roles: Vec; - { - let mut xact = client.transaction()?; - roles = get_existing_roles(&mut xact)?; - } - let databases: Vec = get_existing_dbs(&mut client)?.values().cloned().collect(); - Ok(CatalogObjects { roles, databases }) - }) - .await? + let (client, connection): (tokio_postgres::Client, _) = + connect(connstr.as_str(), NoTls).await?; + + spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let roles = get_existing_roles_async(&client).await?; + + let databases = get_existing_dbs_async(&client) + .await? + .into_values() + .collect(); + + Ok(CatalogObjects { roles, databases }) } #[derive(Debug, thiserror::Error)] diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs index d76eaad0a0..cec2b1bed8 100644 --- a/compute_tools/src/checker.rs +++ b/compute_tools/src/checker.rs @@ -1,37 +1,9 @@ use anyhow::{anyhow, Ok, Result}; -use postgres::Client; use tokio_postgres::NoTls; use tracing::{error, instrument, warn}; use crate::compute::ComputeNode; -/// Create a special service table for availability checks -/// only if it does not exist already. -pub fn create_availability_check_data(client: &mut Client) -> Result<()> { - let query = " - DO $$ - BEGIN - IF NOT EXISTS( - SELECT 1 - FROM pg_catalog.pg_tables - WHERE tablename = 'health_check' - ) - THEN - CREATE TABLE health_check ( - id serial primary key, - updated_at timestamptz default now() - ); - INSERT INTO health_check VALUES (1, now()) - ON CONFLICT (id) DO UPDATE - SET updated_at = now(); - END IF; - END - $$;"; - client.execute(query, &[])?; - - Ok(()) -} - /// Update timestamp in a row in a special service table to check /// that we can actually write some data in this particular timeline. #[instrument(skip_all)] diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 0a8cb14058..4f67425ba8 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,20 +1,21 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env; use std::fs; +use std::iter::once; use std::os::unix::fs::{symlink, PermissionsExt}; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; -use std::sync::{Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::time::Duration; use std::time::Instant; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use compute_api::spec::PgIdent; +use compute_api::spec::{PgIdent, Role}; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -31,15 +32,23 @@ use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion}; use utils::measured_stream::MeasuredReader; use nix::sys::signal::{kill, Signal}; - use remote_storage::{DownloadError, RemotePath}; +use tokio::spawn; +use url::Url; -use crate::checker::create_availability_check_data; use crate::installed_extensions::get_installed_extensions_sync; use crate::local_proxy; -use crate::logger::inlinify; use crate::pg_helpers::*; use crate::spec::*; +use crate::spec_apply::ApplySpecPhase::{ + CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSuperUser, + DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions, + RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, +}; +use crate::spec_apply::PerDatabasePhase::{ + ChangeSchemaPerms, DeleteDBRoleReferences, HandleAnonExtension, +}; +use crate::spec_apply::{apply_operations, MutableApplyContext, DB}; use crate::sync_sk::{check_if_synced, ping_safekeeper}; use crate::{config, extension_server}; @@ -224,10 +233,7 @@ fn maybe_cgexec(cmd: &str) -> Command { } } -/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser -/// that we give to customers -#[instrument(skip_all)] -fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> { +pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String { let roles = spec .cluster .roles @@ -296,11 +302,8 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> $$;"#, roles_decl, database_decl, ); - info!("Neon superuser created: {}", inlinify(&query)); - client - .simple_query(&query) - .map_err(|e| anyhow::anyhow!(e).context(query))?; - Ok(()) + + query } impl ComputeNode { @@ -813,21 +816,14 @@ impl ComputeNode { Ok(()) } - /// Do initial configuration of the already started Postgres. - #[instrument(skip_all)] - pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - // If connection fails, - // it may be the old node with `zenith_admin` superuser. - // - // In this case we need to connect with old `zenith_admin` name - // and create new user. We cannot simply rename connected user, - // but we can create a new one and grant it all privileges. - let mut connstr = self.connstr.clone(); + async fn get_maintenance_client(url: &Url) -> Result { + let mut connstr = url.clone(); + connstr .query_pairs_mut() .append_pair("application_name", "apply_config"); - let mut client = match Client::connect(connstr.as_str(), NoTls) { + let (client, conn) = match tokio_postgres::connect(connstr.as_str(), NoTls).await { Err(e) => match e.code() { Some(&SqlState::INVALID_PASSWORD) | Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => { @@ -845,8 +841,8 @@ impl ComputeNode { let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls) .context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?; - // Disable forwarding so that users don't get a cloud_admin role + // Disable forwarding so that users don't get a cloud_admin role let mut func = || { client.simple_query("SET neon.forward_ddl = false")?; client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?; @@ -858,49 +854,309 @@ impl ComputeNode { drop(client); // reconnect with connstring with expected name - Client::connect(connstr.as_str(), NoTls)? + tokio_postgres::connect(connstr.as_str(), NoTls).await? } _ => return Err(e.into()), }, - Ok(client) => client, + Ok((client, conn)) => (client, conn), }; - // Disable DDL forwarding because control plane already knows about these roles/databases. + spawn(async move { + if let Err(e) = conn.await { + error!("maintenance client connection error: {}", e); + } + }); + + // Disable DDL forwarding because control plane already knows about the roles/databases + // we're about to modify. client .simple_query("SET neon.forward_ddl = false") + .await .context("apply_config SET neon.forward_ddl = false")?; - // Proceed with post-startup configuration. Note, that order of operations is important. - let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; - create_neon_superuser(spec, &mut client).context("apply_config create_neon_superuser")?; - cleanup_instance(&mut client).context("apply_config cleanup_instance")?; - handle_roles(spec, &mut client).context("apply_config handle_roles")?; - handle_databases(spec, &mut client).context("apply_config handle_databases")?; - handle_role_deletions(spec, connstr.as_str(), &mut client) - .context("apply_config handle_role_deletions")?; - handle_grants( - spec, - &mut client, - connstr.as_str(), - self.has_feature(ComputeFeature::AnonExtension), - ) - .context("apply_config handle_grants")?; - handle_extensions(spec, &mut client).context("apply_config handle_extensions")?; - handle_extension_neon(&mut client).context("apply_config handle_extension_neon")?; - create_availability_check_data(&mut client) - .context("apply_config create_availability_check_data")?; + Ok(client) + } - // 'Close' connection - drop(client); + /// Apply the spec to the running PostgreSQL instance. + /// The caller can decide to run with multiple clients in parallel, or + /// single mode. Either way, the commands executed will be the same, and + /// only commands run in different databases are parallelized. + #[instrument(skip_all)] + pub fn apply_spec_sql( + &self, + spec: Arc, + url: Arc, + concurrency: usize, + ) -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; - if let Some(ref local_proxy) = spec.local_proxy_config { + info!("Applying config with max {} concurrency", concurrency); + debug!("Config: {:?}", spec); + + rt.block_on(async { + // Proceed with post-startup configuration. Note, that order of operations is important. + let client = Self::get_maintenance_client(&url).await?; + let spec = spec.clone(); + + let databases = get_existing_dbs_async(&client).await?; + let roles = get_existing_roles_async(&client) + .await? + .into_iter() + .map(|role| (role.name.clone(), role)) + .collect::>(); + + let jwks_roles = Arc::new( + spec.as_ref() + .local_proxy_config + .iter() + .flat_map(|it| &it.jwks) + .flatten() + .flat_map(|setting| &setting.role_names) + .cloned() + .collect::>(), + ); + + let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext { + roles, + dbs: databases, + })); + + for phase in [ + CreateSuperUser, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + ] { + debug!("Applying phase {:?}", &phase); + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + phase, + || async { Ok(&client) }, + ) + .await?; + } + + let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + let db_processes = spec + .cluster + .databases + .iter() + .map(|db| DB::new(db.clone())) + // include + .chain(once(DB::SystemDB)) + .map(|db| { + let spec = spec.clone(); + let ctx = ctx.clone(); + let jwks_roles = jwks_roles.clone(); + let mut url = url.as_ref().clone(); + let concurrency_token = concurrency_token.clone(); + let db = db.clone(); + + debug!("Applying per-database phases for Database {:?}", &db); + + match &db { + DB::SystemDB => {} + DB::UserDB(db) => { + url.set_path(db.name.as_str()); + } + } + + let url = Arc::new(url); + let fut = Self::apply_spec_sql_db( + spec.clone(), + url, + ctx.clone(), + jwks_roles.clone(), + concurrency_token.clone(), + db, + ); + + Ok(spawn(fut)) + }) + .collect::>>(); + + for process in db_processes.into_iter() { + let handle = process?; + handle.await??; + } + + for phase in vec![ + HandleOtherExtensions, + HandleNeonExtension, + CreateAvailabilityCheck, + DropRoles, + ] { + debug!("Applying phase {:?}", &phase); + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + phase, + || async { Ok(&client) }, + ) + .await?; + } + + Ok::<(), anyhow::Error>(()) + })?; + + Ok(()) + } + + /// Apply SQL migrations of the RunInEachDatabase phase. + /// + /// May opt to not connect to databases that don't have any scheduled + /// operations. The function is concurrency-controlled with the provided + /// semaphore. The caller has to make sure the semaphore isn't exhausted. + async fn apply_spec_sql_db( + spec: Arc, + url: Arc, + ctx: Arc>, + jwks_roles: Arc>, + concurrency_token: Arc, + db: DB, + ) -> Result<()> { + let _permit = concurrency_token.acquire().await?; + + let mut client_conn = None; + + for subphase in [ + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, + ] { + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + RunInEachDatabase { + db: db.clone(), + subphase, + }, + // Only connect if apply_operation actually wants a connection. + // It's quite possible this database doesn't need any queries, + // so by not connecting we save time and effort connecting to + // that database. + || async { + if client_conn.is_none() { + let db_client = Self::get_maintenance_client(&url).await?; + client_conn.replace(db_client); + } + let client = client_conn.as_ref().unwrap(); + Ok(client) + }, + ) + .await?; + } + + drop(client_conn); + + Ok::<(), anyhow::Error>(()) + } + + /// Do initial configuration of the already started Postgres. + #[instrument(skip_all)] + pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { + // If connection fails, + // it may be the old node with `zenith_admin` superuser. + // + // In this case we need to connect with old `zenith_admin` name + // and create new user. We cannot simply rename connected user, + // but we can create a new one and grant it all privileges. + let mut url = self.connstr.clone(); + url.query_pairs_mut() + .append_pair("application_name", "apply_config"); + + let url = Arc::new(url); + let spec = Arc::new( + compute_state + .pspec + .as_ref() + .expect("spec must be set") + .spec + .clone(), + ); + + // Choose how many concurrent connections to use for applying the spec changes. + // If the cluster is not currently Running we don't have to deal with user connections, + // and can thus use all `max_connections` connection slots. However, that's generally not + // very efficient, so we generally still limit it to a smaller number. + let max_concurrent_connections = if compute_state.status != ComputeStatus::Running { + // If the settings contain 'max_connections', use that as template + if let Some(config) = spec.cluster.settings.find("max_connections") { + config.parse::().ok() + } else { + // Otherwise, try to find the setting in the postgresql_conf string + spec.cluster + .postgresql_conf + .iter() + .flat_map(|conf| conf.split("\n")) + .filter_map(|line| { + if !line.contains("max_connections") { + return None; + } + + let (key, value) = line.split_once("=")?; + let key = key + .trim_start_matches(char::is_whitespace) + .trim_end_matches(char::is_whitespace); + + let value = value + .trim_start_matches(char::is_whitespace) + .trim_end_matches(char::is_whitespace); + + if key != "max_connections" { + return None; + } + + value.parse::().ok() + }) + .next() + } + // If max_connections is present, use at most 1/3rd of that. + // When max_connections is lower than 30, try to use at least 10 connections, but + // never more than max_connections. + .map(|limit| match limit { + 0..10 => limit, + 10..30 => 10, + 30.. => limit / 3, + }) + // If we didn't find max_connections, default to 10 concurrent connections. + .unwrap_or(10) + } else { + // state == Running + // Because the cluster is already in the Running state, we should assume users are + // already connected to the cluster, and high concurrency could negatively + // impact user connectivity. Therefore, we can limit concurrency to the number of + // reserved superuser connections, which users wouldn't be able to use anyway. + spec.cluster + .settings + .find("superuser_reserved_connections") + .iter() + .filter_map(|val| val.parse::().ok()) + .map(|val| if val > 1 { val - 1 } else { 1 }) + .last() + .unwrap_or(3) + }; + + // Merge-apply spec & changes to PostgreSQL state. + self.apply_spec_sql(spec.clone(), url.clone(), max_concurrent_connections)?; + + if let Some(ref local_proxy) = &spec.clone().local_proxy_config { info!("configuring local_proxy"); local_proxy::configure(local_proxy).context("apply_config local_proxy")?; } // Run migrations separately to not hold up cold starts thread::spawn(move || { - let mut connstr = connstr.clone(); + let mut connstr = url.as_ref().clone(); connstr .query_pairs_mut() .append_pair("application_name", "migrations"); @@ -908,7 +1164,8 @@ impl ComputeNode { let mut client = Client::connect(connstr.as_str(), NoTls)?; handle_migrations(&mut client).context("apply_config handle_migrations") }); - Ok(()) + + Ok::<(), anyhow::Error>(()) } // Wrapped this around `pg_ctl reload`, but right now we don't use @@ -971,32 +1228,16 @@ impl ComputeNode { config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || { self.pg_reload_conf()?; - let mut client = Client::connect(self.connstr.as_str(), NoTls)?; - - // Proceed with post-startup configuration. Note, that order of operations is important. - // Disable DDL forwarding because control plane already knows about these roles/databases. if spec.mode == ComputeMode::Primary { - client.simple_query("SET neon.forward_ddl = false")?; - cleanup_instance(&mut client)?; - handle_roles(&spec, &mut client)?; - handle_databases(&spec, &mut client)?; - handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?; - handle_grants( - &spec, - &mut client, - self.connstr.as_str(), - self.has_feature(ComputeFeature::AnonExtension), - )?; - handle_extensions(&spec, &mut client)?; - handle_extension_neon(&mut client)?; - // We can skip handle_migrations here because a new migration can only appear - // if we have a new version of the compute_ctl binary, which can only happen - // if compute got restarted, in which case we'll end up inside of apply_config - // instead of reconfigure. - } + let mut url = self.connstr.clone(); + url.query_pairs_mut() + .append_pair("application_name", "apply_config"); + let url = Arc::new(url); - // 'Close' connection - drop(client); + let spec = Arc::new(spec.clone()); + + self.apply_spec_sql(spec, url, 1)?; + } Ok(()) })?; diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index d27ae58fa2..ee4cf2dfa5 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -23,5 +23,6 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod spec; +mod spec_apply; pub mod swap; pub mod sync_sk; diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index b2dc265864..4a1e5ee0e8 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -10,9 +10,9 @@ use std::thread::JoinHandle; use std::time::{Duration, Instant}; use anyhow::{bail, Result}; +use futures::StreamExt; use ini::Ini; use notify::{RecursiveMode, Watcher}; -use postgres::{Client, Transaction}; use tokio::io::AsyncBufReadExt; use tokio::time::timeout; use tokio_postgres::NoTls; @@ -197,27 +197,34 @@ impl Escaping for PgIdent { } /// Build a list of existing Postgres roles -pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result> { - let postgres_roles = xact - .query("SELECT rolname, rolpassword FROM pg_catalog.pg_authid", &[])? - .iter() +pub async fn get_existing_roles_async(client: &tokio_postgres::Client) -> Result> { + let postgres_roles = client + .query_raw::( + "SELECT rolname, rolpassword FROM pg_catalog.pg_authid", + &[], + ) + .await? + .filter_map(|row| async { row.ok() }) .map(|row| Role { name: row.get("rolname"), encrypted_password: row.get("rolpassword"), options: None, }) - .collect(); + .collect() + .await; Ok(postgres_roles) } /// Build a list of existing Postgres databases -pub fn get_existing_dbs(client: &mut Client) -> Result> { +pub async fn get_existing_dbs_async( + client: &tokio_postgres::Client, +) -> Result> { // `pg_database.datconnlimit = -2` means that the database is in the // invalid state. See: // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 - let postgres_dbs: Vec = client - .query( + let rowstream = client + .query_raw::( "SELECT datname AS name, datdba::regrole::text AS owner, @@ -226,8 +233,11 @@ pub fn get_existing_dbs(client: &mut Client) -> Result FROM pg_catalog.pg_database;", &[], - )? - .iter() + ) + .await?; + + let dbs_map = rowstream + .filter_map(|r| async { r.ok() }) .map(|row| Database { name: row.get("name"), owner: row.get("owner"), @@ -235,12 +245,9 @@ pub fn get_existing_dbs(client: &mut Client) -> Result invalid: row.get("invalid"), options: None, }) - .collect(); - - let dbs_map = postgres_dbs - .iter() .map(|db| (db.name.clone(), db.clone())) - .collect::>(); + .collect::>() + .await; Ok(dbs_map) } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 73f3d1006a..c7d2deb090 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,22 +1,17 @@ -use std::collections::HashSet; +use anyhow::{anyhow, bail, Result}; +use postgres::Client; +use reqwest::StatusCode; use std::fs::File; use std::path::Path; -use std::str::FromStr; - -use anyhow::{anyhow, bail, Context, Result}; -use postgres::config::Config; -use postgres::{Client, NoTls}; -use reqwest::StatusCode; -use tracing::{error, info, info_span, instrument, span_enabled, warn, Level}; +use tracing::{error, info, instrument, warn}; use crate::config; -use crate::logger::inlinify; use crate::migration::MigrationRunner; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse}; -use compute_api::spec::{ComputeSpec, PgIdent, Role}; +use compute_api::spec::ComputeSpec; // Do control plane request and return response if any. In case of error it // returns a bool flag indicating whether it makes sense to retry the request @@ -151,625 +146,6 @@ pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> { Ok(()) } -/// Compute could be unexpectedly shut down, for example, during the -/// database dropping. This leaves the database in the invalid state, -/// which prevents new db creation with the same name. This function -/// will clean it up before proceeding with catalog updates. All -/// possible future cleanup operations may go here too. -#[instrument(skip_all)] -pub fn cleanup_instance(client: &mut Client) -> Result<()> { - let existing_dbs = get_existing_dbs(client)?; - - for (_, db) in existing_dbs { - if db.invalid { - // After recent commit in Postgres, interrupted DROP DATABASE - // leaves the database in the invalid state. According to the - // commit message, the only option for user is to drop it again. - // See: - // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 - // - // Postgres Neon extension is done the way, that db is de-registered - // in the control plane metadata only after it is dropped. So there is - // a chance that it still thinks that db should exist. This means - // that it will be re-created by `handle_databases()`. Yet, it's fine - // as user can just repeat drop (in vanilla Postgres they would need - // to do the same, btw). - let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()); - info!("dropping invalid database {}", db.name); - client.execute(query.as_str(), &[])?; - } - } - - Ok(()) -} - -/// Given a cluster spec json and open transaction it handles roles creation, -/// deletion and update. -#[instrument(skip_all)] -pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - let mut xact = client.transaction()?; - let existing_roles: Vec = get_existing_roles(&mut xact)?; - - let mut jwks_roles = HashSet::new(); - if let Some(local_proxy) = &spec.local_proxy_config { - for jwks_setting in local_proxy.jwks.iter().flatten() { - for role_name in &jwks_setting.role_names { - jwks_roles.insert(role_name.clone()); - } - } - } - - // Print a list of existing Postgres roles (only in debug mode) - if span_enabled!(Level::INFO) { - let mut vec = Vec::new(); - for r in &existing_roles { - vec.push(format!( - "{}:{}", - r.name, - if r.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - } - )); - } - - info!("postgres roles (total {}): {:?}", vec.len(), vec); - } - - // Process delta operations first - if let Some(ops) = &spec.delta_operations { - info!("processing role renames"); - for op in ops { - match op.action.as_ref() { - "delete_role" => { - // no-op now, roles will be deleted at the end of configuration - } - // Renaming role drops its password, since role name is - // used as a salt there. It is important that this role - // is recorded with a new `name` in the `roles` list. - // Follow up roles update will set the new password. - "rename_role" => { - let new_name = op.new_name.as_ref().unwrap(); - - // XXX: with a limited number of roles it is fine, but consider making it a HashMap - if existing_roles.iter().any(|r| r.name == op.name) { - let query: String = format!( - "ALTER ROLE {} RENAME TO {}", - op.name.pg_quote(), - new_name.pg_quote() - ); - - warn!("renaming role '{}' to '{}'", op.name, new_name); - xact.execute(query.as_str(), &[])?; - } - } - _ => {} - } - } - } - - // Refresh Postgres roles info to handle possible roles renaming - let existing_roles: Vec = get_existing_roles(&mut xact)?; - - info!( - "handling cluster spec roles (total {})", - spec.cluster.roles.len() - ); - for role in &spec.cluster.roles { - let name = &role.name; - // XXX: with a limited number of roles it is fine, but consider making it a HashMap - let pg_role = existing_roles.iter().find(|r| r.name == *name); - - enum RoleAction { - None, - Update, - Create, - } - let action = if let Some(r) = pg_role { - if (r.encrypted_password.is_none() && role.encrypted_password.is_some()) - || (r.encrypted_password.is_some() && role.encrypted_password.is_none()) - { - RoleAction::Update - } else if let Some(pg_pwd) = &r.encrypted_password { - // Check whether password changed or not (trim 'md5' prefix first if any) - // - // This is a backward compatibility hack, which comes from the times when we were using - // md5 for everyone and hashes were stored in the console db without md5 prefix. So when - // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix, - // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix. - // Here is the only place so far where we compare hashes, so it seems to be the best candidate - // to place this compatibility layer. - let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") { - stripped - } else { - pg_pwd - }; - if pg_pwd != *role.encrypted_password.as_ref().unwrap() { - RoleAction::Update - } else { - RoleAction::None - } - } else { - RoleAction::None - } - } else { - RoleAction::Create - }; - - match action { - RoleAction::None => {} - RoleAction::Update => { - // This can be run on /every/ role! Not just ones created through the console. - // This means that if you add some funny ALTER here that adds a permission, - // this will get run even on user-created roles! This will result in different - // behavior before and after a spec gets reapplied. The below ALTER as it stands - // now only grants LOGIN and changes the password. Please do not allow this branch - // to do anything silly. - let mut query: String = format!("ALTER ROLE {} ", name.pg_quote()); - query.push_str(&role.to_pg_options()); - xact.execute(query.as_str(), &[])?; - } - RoleAction::Create => { - // This branch only runs when roles are created through the console, so it is - // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited - // from neon_superuser. - let mut query: String = format!( - "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser", - name.pg_quote() - ); - if jwks_roles.contains(name.as_str()) { - query = format!("CREATE ROLE {}", name.pg_quote()); - } - info!("running role create query: '{}'", &query); - query.push_str(&role.to_pg_options()); - xact.execute(query.as_str(), &[])?; - } - } - - if span_enabled!(Level::INFO) { - let pwd = if role.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - }; - let action_str = match action { - RoleAction::None => "", - RoleAction::Create => " -> create", - RoleAction::Update => " -> update", - }; - info!(" - {}:{}{}", name, pwd, action_str); - } - } - - xact.commit()?; - - Ok(()) -} - -/// Reassign all dependent objects and delete requested roles. -#[instrument(skip_all)] -pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { - if let Some(ops) = &spec.delta_operations { - // First, reassign all dependent objects to db owners. - info!("reassigning dependent objects of to-be-deleted roles"); - - // Fetch existing roles. We could've exported and used `existing_roles` from - // `handle_roles()`, but we only make this list there before creating new roles. - // Which is probably fine as we never create to-be-deleted roles, but that'd - // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared - // buffers already, so this shouldn't be a big deal. - let mut xact = client.transaction()?; - let existing_roles: Vec = get_existing_roles(&mut xact)?; - xact.commit()?; - - for op in ops { - // Check that role is still present in Postgres, as this could be a - // restart with the same spec after role deletion. - if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) { - reassign_owned_objects(spec, connstr, &op.name)?; - } - } - - // Second, proceed with role deletions. - info!("processing role deletions"); - let mut xact = client.transaction()?; - for op in ops { - // We do not check either role exists or not, - // Postgres will take care of it for us - if op.action == "delete_role" { - let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote()); - - warn!("deleting role '{}'", &op.name); - xact.execute(query.as_str(), &[])?; - } - } - xact.commit()?; - } - - Ok(()) -} - -fn reassign_owned_objects_in_one_db( - conf: Config, - role_name: &PgIdent, - db_owner: &PgIdent, -) -> Result<()> { - let mut client = conf.connect(NoTls)?; - - // This will reassign all dependent objects to the db owner - let reassign_query = format!( - "REASSIGN OWNED BY {} TO {}", - role_name.pg_quote(), - db_owner.pg_quote() - ); - info!( - "reassigning objects owned by '{}' in db '{}' to '{}'", - role_name, - conf.get_dbname().unwrap_or(""), - db_owner - ); - client.simple_query(&reassign_query)?; - - // This now will only drop privileges of the role - let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote()); - client.simple_query(&drop_query)?; - Ok(()) -} - -// Reassign all owned objects in all databases to the owner of the database. -fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> { - for db in &spec.cluster.databases { - if db.owner != *role_name { - let mut conf = Config::from_str(connstr)?; - conf.dbname(&db.name); - reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?; - } - } - - // Also handle case when there are no databases in the spec. - // In this case we need to reassign objects in the default database. - let conf = Config::from_str(connstr)?; - let db_owner = PgIdent::from_str("cloud_admin")?; - reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?; - - Ok(()) -} - -/// It follows mostly the same logic as `handle_roles()` excepting that we -/// does not use an explicit transactions block, since major database operations -/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level -/// atomicity should be enough here due to the order of operations and various checks, -/// which together provide us idempotency. -#[instrument(skip_all)] -pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - let existing_dbs = get_existing_dbs(client)?; - - // Print a list of existing Postgres databases (only in debug mode) - if span_enabled!(Level::INFO) { - let mut vec = Vec::new(); - for (dbname, db) in &existing_dbs { - vec.push(format!("{}:{}", dbname, db.owner)); - } - info!("postgres databases (total {}): {:?}", vec.len(), vec); - } - - // Process delta operations first - if let Some(ops) = &spec.delta_operations { - info!("processing delta operations on databases"); - for op in ops { - match op.action.as_ref() { - // We do not check either DB exists or not, - // Postgres will take care of it for us - "delete_db" => { - // In Postgres we can't drop a database if it is a template. - // So we need to unset the template flag first, but it could - // be a retry, so we could've already dropped the database. - // Check that database exists first to make it idempotent. - let unset_template_query: String = format!( - " - DO $$ - BEGIN - IF EXISTS( - SELECT 1 - FROM pg_catalog.pg_database - WHERE datname = {} - ) - THEN - ALTER DATABASE {} is_template false; - END IF; - END - $$;", - escape_literal(&op.name), - &op.name.pg_quote() - ); - // Use FORCE to drop database even if there are active connections. - // We run this from `cloud_admin`, so it should have enough privileges. - // NB: there could be other db states, which prevent us from dropping - // the database. For example, if db is used by any active subscription - // or replication slot. - // TODO: deal with it once we allow logical replication. Proper fix should - // involve returning an error code to the control plane, so it could - // figure out that this is a non-retryable error, return it to the user - // and fail operation permanently. - let drop_db_query: String = format!( - "DROP DATABASE IF EXISTS {} WITH (FORCE)", - &op.name.pg_quote() - ); - - warn!("deleting database '{}'", &op.name); - client.execute(unset_template_query.as_str(), &[])?; - client.execute(drop_db_query.as_str(), &[])?; - } - "rename_db" => { - let new_name = op.new_name.as_ref().unwrap(); - - if existing_dbs.contains_key(&op.name) { - let query: String = format!( - "ALTER DATABASE {} RENAME TO {}", - op.name.pg_quote(), - new_name.pg_quote() - ); - - warn!("renaming database '{}' to '{}'", op.name, new_name); - client.execute(query.as_str(), &[])?; - } - } - _ => {} - } - } - } - - // Refresh Postgres databases info to handle possible renames - let existing_dbs = get_existing_dbs(client)?; - - info!( - "handling cluster spec databases (total {})", - spec.cluster.databases.len() - ); - for db in &spec.cluster.databases { - let name = &db.name; - let pg_db = existing_dbs.get(name); - - enum DatabaseAction { - None, - Update, - Create, - } - let action = if let Some(r) = pg_db { - // XXX: db owner name is returned as quoted string from Postgres, - // when quoting is needed. - let new_owner = if r.owner.starts_with('"') { - db.owner.pg_quote() - } else { - db.owner.clone() - }; - - if new_owner != r.owner { - // Update the owner - DatabaseAction::Update - } else { - DatabaseAction::None - } - } else { - DatabaseAction::Create - }; - - match action { - DatabaseAction::None => {} - DatabaseAction::Update => { - let query: String = format!( - "ALTER DATABASE {} OWNER TO {}", - name.pg_quote(), - db.owner.pg_quote() - ); - let _guard = info_span!("executing", query).entered(); - client.execute(query.as_str(), &[])?; - } - DatabaseAction::Create => { - let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote()); - query.push_str(&db.to_pg_options()); - let _guard = info_span!("executing", query).entered(); - client.execute(query.as_str(), &[])?; - let grant_query: String = format!( - "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser", - name.pg_quote() - ); - client.execute(grant_query.as_str(), &[])?; - } - }; - - if span_enabled!(Level::INFO) { - let action_str = match action { - DatabaseAction::None => "", - DatabaseAction::Create => " -> create", - DatabaseAction::Update => " -> update", - }; - info!(" - {}:{}{}", db.name, db.owner, action_str); - } - } - - Ok(()) -} - -/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants -/// to allow users creating trusted extensions and re-creating `public` schema, for example. -#[instrument(skip_all)] -pub fn handle_grants( - spec: &ComputeSpec, - client: &mut Client, - connstr: &str, - enable_anon_extension: bool, -) -> Result<()> { - info!("modifying database permissions"); - let existing_dbs = get_existing_dbs(client)?; - - // Do some per-database access adjustments. We'd better do this at db creation time, - // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants - // atomically. - for db in &spec.cluster.databases { - match existing_dbs.get(&db.name) { - Some(pg_db) => { - if pg_db.restrict_conn || pg_db.invalid { - info!( - "skipping grants for db {} (invalid: {}, connections not allowed: {})", - db.name, pg_db.invalid, pg_db.restrict_conn - ); - continue; - } - } - None => { - bail!( - "database {} doesn't exist in Postgres after handle_databases()", - db.name - ); - } - } - - let mut conf = Config::from_str(connstr)?; - conf.dbname(&db.name); - - let mut db_client = conf.connect(NoTls)?; - - // This will only change ownership on the schema itself, not the objects - // inside it. Without it owner of the `public` schema will be `cloud_admin` - // and database owner cannot do anything with it. SQL procedure ensures - // that it won't error out if schema `public` doesn't exist. - let alter_query = format!( - "DO $$\n\ - DECLARE\n\ - schema_owner TEXT;\n\ - BEGIN\n\ - IF EXISTS(\n\ - SELECT nspname\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - )\n\ - THEN\n\ - SELECT nspowner::regrole::text\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - INTO schema_owner;\n\ - \n\ - IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\ - THEN\n\ - ALTER SCHEMA public OWNER TO {};\n\ - END IF;\n\ - END IF;\n\ - END\n\ - $$;", - db.owner.pg_quote() - ); - db_client.simple_query(&alter_query)?; - - // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. - // This is needed because since postgres 15 this privilege is removed by default. - // TODO: web_access isn't created for almost 1 year. It could be that we have - // active users of 1 year old projects, but hopefully not, so check it and - // remove this code if possible. The worst thing that could happen is that - // user won't be able to use public schema in NEW databases created in the - // very OLD project. - // - // Also, alter default permissions so that relations created by extensions can be - // used by neon_superuser without permission issues. - let grant_query = "DO $$\n\ - BEGIN\n\ - IF EXISTS(\n\ - SELECT nspname\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - ) AND\n\ - current_setting('server_version_num')::int/10000 >= 15\n\ - THEN\n\ - IF EXISTS(\n\ - SELECT rolname\n\ - FROM pg_catalog.pg_roles\n\ - WHERE rolname = 'web_access'\n\ - )\n\ - THEN\n\ - GRANT CREATE ON SCHEMA public TO web_access;\n\ - END IF;\n\ - END IF;\n\ - IF EXISTS(\n\ - SELECT nspname\n\ - FROM pg_catalog.pg_namespace\n\ - WHERE nspname = 'public'\n\ - )\n\ - THEN\n\ - ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\ - ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\ - END IF;\n\ - END\n\ - $$;" - .to_string(); - - info!( - "grant query for db {} : {}", - &db.name, - inlinify(&grant_query) - ); - db_client.simple_query(&grant_query)?; - - // it is important to run this after all grants - if enable_anon_extension { - handle_extension_anon(spec, &db.owner, &mut db_client, false) - .context("handle_grants handle_extension_anon")?; - } - } - - Ok(()) -} - -/// Create required system extensions -#[instrument(skip_all)] -pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { - if libs.contains("pg_stat_statements") { - // Create extension only if this compute really needs it - let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements"; - info!("creating system extensions with query: {}", query); - client.simple_query(query)?; - } - } - - Ok(()) -} - -/// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database -#[instrument(skip_all)] -pub fn handle_extension_neon(client: &mut Client) -> Result<()> { - info!("handle extension neon"); - - let mut query = "CREATE SCHEMA IF NOT EXISTS neon"; - client.simple_query(query)?; - - query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"; - info!("create neon extension with query: {}", query); - client.simple_query(query)?; - - query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'"; - client.simple_query(query)?; - - query = "ALTER EXTENSION neon SET SCHEMA neon"; - info!("alter neon extension schema with query: {}", query); - client.simple_query(query)?; - - // this will be a no-op if extension is already up to date, - // which may happen in two cases: - // - extension was just installed - // - extension was already installed and is up to date - let query = "ALTER EXTENSION neon UPDATE"; - info!("update neon extension version with query: {}", query); - if let Err(e) = client.simple_query(query) { - error!( - "failed to upgrade neon extension during `handle_extension_neon`: {}", - e - ); - } - - Ok(()) -} - #[instrument(skip_all)] pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> { info!("handle neon extension upgrade"); diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs new file mode 100644 index 0000000000..7308d5d36e --- /dev/null +++ b/compute_tools/src/spec_apply.rs @@ -0,0 +1,680 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::iter::empty; +use std::iter::once; +use std::sync::Arc; + +use crate::compute::construct_superuser_query; +use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt}; +use anyhow::{bail, Result}; +use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role}; +use futures::future::join_all; +use tokio::sync::RwLock; +use tokio_postgres::Client; +use tracing::{debug, info_span, Instrument}; + +#[derive(Clone)] +pub enum DB { + SystemDB, + UserDB(Database), +} + +impl DB { + pub fn new(db: Database) -> DB { + Self::UserDB(db) + } + + pub fn is_owned_by(&self, role: &PgIdent) -> bool { + match self { + DB::SystemDB => false, + DB::UserDB(db) => &db.owner == role, + } + } +} + +impl Debug for DB { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DB::SystemDB => f.debug_tuple("SystemDB").finish(), + DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(), + } + } +} + +#[derive(Copy, Clone, Debug)] +pub enum PerDatabasePhase { + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, +} + +#[derive(Clone, Debug)] +pub enum ApplySpecPhase { + CreateSuperUser, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + RunInEachDatabase { db: DB, subphase: PerDatabasePhase }, + HandleOtherExtensions, + HandleNeonExtension, + CreateAvailabilityCheck, + DropRoles, +} + +pub struct Operation { + pub query: String, + pub comment: Option, +} + +pub struct MutableApplyContext { + pub roles: HashMap, + pub dbs: HashMap, +} + +/// Appply the operations that belong to the given spec apply phase. +/// +/// Commands within a single phase are executed in order of Iterator yield. +/// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database +/// indicated by its `db` field, and can share a single client for all changes +/// to that database. +/// +/// Notes: +/// - Commands are pipelined, and thus may cause incomplete apply if one +/// command of many fails. +/// - Failing commands will fail the phase's apply step once the return value +/// is processed. +/// - No timeouts have (yet) been implemented. +/// - The caller is responsible for limiting and/or applying concurrency. +pub async fn apply_operations<'a, Fut, F>( + spec: Arc, + ctx: Arc>, + jwks_roles: Arc>, + apply_spec_phase: ApplySpecPhase, + client: F, +) -> Result<()> +where + F: FnOnce() -> Fut, + Fut: Future>, +{ + debug!("Starting phase {:?}", &apply_spec_phase); + let span = info_span!("db_apply_changes", phase=?apply_spec_phase); + let span2 = span.clone(); + async move { + debug!("Processing phase {:?}", &apply_spec_phase); + let ctx = ctx; + + let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase) + .await? + .peekable(); + + // Return (and by doing so, skip requesting the PostgreSQL client) if + // we don't have any operations scheduled. + if ops.peek().is_none() { + return Ok(()); + } + + let client = client().await?; + + debug!("Applying phase {:?}", &apply_spec_phase); + + let active_queries = ops + .map(|op| { + let Operation { comment, query } = op; + let inspan = match comment { + None => span.clone(), + Some(comment) => info_span!("phase {}: {}", comment), + }; + + async { + let query = query; + let res = client.simple_query(&query).await; + debug!( + "{} {}", + if res.is_ok() { + "successfully executed" + } else { + "failed to execute" + }, + query + ); + res + } + .instrument(inspan) + }) + .collect::>(); + + drop(ctx); + + for it in join_all(active_queries).await { + drop(it?); + } + + debug!("Completed phase {:?}", &apply_spec_phase); + + Ok(()) + } + .instrument(span2) + .await +} + +/// Create a stream of operations to be executed for that phase of applying +/// changes. +/// +/// In the future we may generate a single stream of changes and then +/// sort/merge/batch execution, but for now this is a nice way to improve +/// batching behaviour of the commands. +async fn get_operations<'a>( + spec: &'a ComputeSpec, + ctx: &'a RwLock, + jwks_roles: &'a HashSet, + apply_spec_phase: &'a ApplySpecPhase, +) -> Result + 'a + Send>> { + match apply_spec_phase { + ApplySpecPhase::CreateSuperUser => { + let query = construct_superuser_query(spec); + + Ok(Box::new(once(Operation { + query, + comment: None, + }))) + } + ApplySpecPhase::DropInvalidDatabases => { + let mut ctx = ctx.write().await; + let databases = &mut ctx.dbs; + + let keys: Vec<_> = databases + .iter() + .filter(|(_, db)| db.invalid) + .map(|(dbname, _)| dbname.clone()) + .collect(); + + // After recent commit in Postgres, interrupted DROP DATABASE + // leaves the database in the invalid state. According to the + // commit message, the only option for user is to drop it again. + // See: + // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9 + // + // Postgres Neon extension is done the way, that db is de-registered + // in the control plane metadata only after it is dropped. So there is + // a chance that it still thinks that the db should exist. This means + // that it will be re-created by the `CreateDatabases` phase. This + // is fine, as user can just drop the table again (in vanilla + // Postgres they would need to do the same). + let operations = keys + .into_iter() + .filter_map(move |dbname| ctx.dbs.remove(&dbname)) + .map(|db| Operation { + query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()), + comment: Some(format!("Dropping invalid database {}", db.name)), + }); + + Ok(Box::new(operations)) + } + ApplySpecPhase::RenameRoles => { + let mut ctx = ctx.write().await; + + let operations = spec + .delta_operations + .iter() + .flatten() + .filter(|op| op.action == "rename_role") + .filter_map(move |op| { + let roles = &mut ctx.roles; + + if roles.contains_key(op.name.as_str()) { + None + } else { + let new_name = op.new_name.as_ref().unwrap(); + let mut role = roles.remove(op.name.as_str()).unwrap(); + + role.name = new_name.clone(); + role.encrypted_password = None; + roles.insert(role.name.clone(), role); + + Some(Operation { + query: format!( + "ALTER ROLE {} RENAME TO {}", + op.name.pg_quote(), + new_name.pg_quote() + ), + comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)), + }) + } + }); + + Ok(Box::new(operations)) + } + ApplySpecPhase::CreateAndAlterRoles => { + let mut ctx = ctx.write().await; + + let operations = spec.cluster.roles + .iter() + .filter_map(move |role| { + let roles = &mut ctx.roles; + let db_role = roles.get(&role.name); + + match db_role { + Some(db_role) => { + if db_role.encrypted_password != role.encrypted_password { + // This can be run on /every/ role! Not just ones created through the console. + // This means that if you add some funny ALTER here that adds a permission, + // this will get run even on user-created roles! This will result in different + // behavior before and after a spec gets reapplied. The below ALTER as it stands + // now only grants LOGIN and changes the password. Please do not allow this branch + // to do anything silly. + Some(Operation { + query: format!( + "ALTER ROLE {} {}", + role.name.pg_quote(), + role.to_pg_options(), + ), + comment: None, + }) + } else { + None + } + } + None => { + let query = if !jwks_roles.contains(role.name.as_str()) { + format!( + "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}", + role.name.pg_quote(), + role.to_pg_options(), + ) + } else { + format!( + "CREATE ROLE {} {}", + role.name.pg_quote(), + role.to_pg_options(), + ) + }; + Some(Operation { + query, + comment: Some(format!("creating role {}", role.name)), + }) + } + } + }); + + Ok(Box::new(operations)) + } + ApplySpecPhase::RenameAndDeleteDatabases => { + let mut ctx = ctx.write().await; + + let operations = spec + .delta_operations + .iter() + .flatten() + .filter_map(move |op| { + let databases = &mut ctx.dbs; + match op.action.as_str() { + // We do not check whether the DB exists or not, + // Postgres will take care of it for us + "delete_db" => { + // In Postgres we can't drop a database if it is a template. + // So we need to unset the template flag first, but it could + // be a retry, so we could've already dropped the database. + // Check that database exists first to make it idempotent. + let unset_template_query: String = format!( + include_str!("sql/unset_template_for_drop_dbs.sql"), + datname_str = escape_literal(&op.name), + datname = &op.name.pg_quote() + ); + + // Use FORCE to drop database even if there are active connections. + // We run this from `cloud_admin`, so it should have enough privileges. + // NB: there could be other db states, which prevent us from dropping + // the database. For example, if db is used by any active subscription + // or replication slot. + // TODO: deal with it once we allow logical replication. Proper fix should + // involve returning an error code to the control plane, so it could + // figure out that this is a non-retryable error, return it to the user + // and fail operation permanently. + let drop_db_query: String = format!( + "DROP DATABASE IF EXISTS {} WITH (FORCE)", + &op.name.pg_quote() + ); + + databases.remove(&op.name); + + Some(vec![ + Operation { + query: unset_template_query, + comment: Some(format!( + "optionally clearing template flags for DB {}", + op.name, + )), + }, + Operation { + query: drop_db_query, + comment: Some(format!("deleting database {}", op.name,)), + }, + ]) + } + "rename_db" => { + if let Some(mut db) = databases.remove(&op.name) { + // update state of known databases + let new_name = op.new_name.as_ref().unwrap(); + db.name = new_name.clone(); + databases.insert(db.name.clone(), db); + + Some(vec![Operation { + query: format!( + "ALTER DATABASE {} RENAME TO {}", + op.name.pg_quote(), + new_name.pg_quote(), + ), + comment: Some(format!( + "renaming database '{}' to '{}'", + op.name, new_name + )), + }]) + } else { + None + } + } + _ => None, + } + }) + .flatten(); + + Ok(Box::new(operations)) + } + ApplySpecPhase::CreateAndAlterDatabases => { + let mut ctx = ctx.write().await; + + let operations = spec + .cluster + .databases + .iter() + .filter_map(move |db| { + let databases = &mut ctx.dbs; + if let Some(edb) = databases.get_mut(&db.name) { + let change_owner = if edb.owner.starts_with('"') { + db.owner.pg_quote() != edb.owner + } else { + db.owner != edb.owner + }; + + edb.owner = db.owner.clone(); + + if change_owner { + Some(vec![Operation { + query: format!( + "ALTER DATABASE {} OWNER TO {}", + db.name.pg_quote(), + db.owner.pg_quote() + ), + comment: Some(format!( + "changing database owner of database {} to {}", + db.name, db.owner + )), + }]) + } else { + None + } + } else { + databases.insert(db.name.clone(), db.clone()); + + Some(vec![ + Operation { + query: format!( + "CREATE DATABASE {} {}", + db.name.pg_quote(), + db.to_pg_options(), + ), + comment: None, + }, + Operation { + query: format!( + "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser", + db.name.pg_quote() + ), + comment: None, + }, + ]) + } + }) + .flatten(); + + Ok(Box::new(operations)) + } + ApplySpecPhase::RunInEachDatabase { db, subphase } => { + match subphase { + PerDatabasePhase::DeleteDBRoleReferences => { + let ctx = ctx.read().await; + + let operations = + spec.delta_operations + .iter() + .flatten() + .filter(|op| op.action == "delete_role") + .filter_map(move |op| { + if db.is_owned_by(&op.name) { + return None; + } + if !ctx.roles.contains_key(&op.name) { + return None; + } + let quoted = op.name.pg_quote(); + let new_owner = match &db { + DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(), + DB::UserDB(db) => db.owner.pg_quote(), + }; + + Some(vec![ + // This will reassign all dependent objects to the db owner + Operation { + query: format!( + "REASSIGN OWNED BY {} TO {}", + quoted, new_owner, + ), + comment: None, + }, + // This now will only drop privileges of the role + Operation { + query: format!("DROP OWNED BY {}", quoted), + comment: None, + }, + ]) + }) + .flatten(); + + Ok(Box::new(operations)) + } + PerDatabasePhase::ChangeSchemaPerms => { + let ctx = ctx.read().await; + let databases = &ctx.dbs; + + let db = match &db { + // ignore schema permissions on the system database + DB::SystemDB => return Ok(Box::new(empty())), + DB::UserDB(db) => db, + }; + + if databases.get(&db.name).is_none() { + bail!("database {} doesn't exist in PostgreSQL", db.name); + } + + let edb = databases.get(&db.name).unwrap(); + + if edb.restrict_conn || edb.invalid { + return Ok(Box::new(empty())); + } + + let operations = vec![ + Operation { + query: format!( + include_str!("sql/set_public_schema_owner.sql"), + db_owner = db.owner.pg_quote() + ), + comment: None, + }, + Operation { + query: String::from(include_str!("sql/default_grants.sql")), + comment: None, + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + PerDatabasePhase::HandleAnonExtension => { + // Only install Anon into user databases + let db = match &db { + DB::SystemDB => return Ok(Box::new(empty())), + DB::UserDB(db) => db, + }; + // Never install Anon when it's not enabled as feature + if !spec.features.contains(&ComputeFeature::AnonExtension) { + return Ok(Box::new(empty())); + } + + // Only install Anon when it's added in preload libraries + let opt_libs = spec.cluster.settings.find("shared_preload_libraries"); + + let libs = match opt_libs { + Some(libs) => libs, + None => return Ok(Box::new(empty())), + }; + + if !libs.contains("anon") { + return Ok(Box::new(empty())); + } + + let db_owner = db.owner.pg_quote(); + + let operations = vec![ + // Create anon extension if this compute needs it + // Users cannot create it themselves, because superuser is required. + Operation { + query: String::from("CREATE EXTENSION IF NOT EXISTS anon CASCADE"), + comment: Some(String::from("creating anon extension")), + }, + // Initialize anon extension + // This also requires superuser privileges, so users cannot do it themselves. + Operation { + query: String::from("SELECT anon.init()"), + comment: Some(String::from("initializing anon extension data")), + }, + Operation { + query: format!("GRANT ALL ON SCHEMA anon TO {}", db_owner), + comment: Some(String::from( + "granting anon extension schema permissions", + )), + }, + Operation { + query: format!( + "GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", + db_owner + ), + comment: Some(String::from( + "granting anon extension schema functions permissions", + )), + }, + // We need this, because some functions are defined as SECURITY DEFINER. + // In Postgres SECURITY DEFINER functions are executed with the privileges + // of the owner. + // In anon extension this it is needed to access some GUCs, which are only accessible to + // superuser. But we've patched postgres to allow db_owner to access them as well. + // So we need to change owner of these functions to db_owner. + Operation { + query: format!( + include_str!("sql/anon_ext_fn_reassign.sql"), + db_owner = db_owner, + ), + comment: Some(String::from( + "change anon extension functions owner to database_owner", + )), + }, + Operation { + query: format!( + "GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", + db_owner, + ), + comment: Some(String::from( + "granting anon extension tables permissions", + )), + }, + Operation { + query: format!( + "GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", + db_owner, + ), + comment: Some(String::from( + "granting anon extension sequences permissions", + )), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + } + } + // Interestingly, we only install p_s_s in the main database, even when + // it's preloaded. + ApplySpecPhase::HandleOtherExtensions => { + if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { + if libs.contains("pg_stat_statements") { + return Ok(Box::new(once(Operation { + query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"), + comment: Some(String::from("create system extensions")), + }))); + } + } + Ok(Box::new(empty())) + } + ApplySpecPhase::HandleNeonExtension => { + let operations = vec![ + Operation { + query: String::from("CREATE SCHEMA IF NOT EXISTS neon"), + comment: Some(String::from("init: add schema for extension")), + }, + Operation { + query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"), + comment: Some(String::from( + "init: install the extension if not already installed", + )), + }, + Operation { + query: String::from( + "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'", + ), + comment: Some(String::from("compat/fix: make neon relocatable")), + }, + Operation { + query: String::from("ALTER EXTENSION neon SET SCHEMA neon"), + comment: Some(String::from("compat/fix: alter neon extension schema")), + }, + Operation { + query: String::from("ALTER EXTENSION neon UPDATE"), + comment: Some(String::from("compat/update: update neon extension version")), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")), + comment: None, + }))), + ApplySpecPhase::DropRoles => { + let operations = spec + .delta_operations + .iter() + .flatten() + .filter(|op| op.action == "delete_role") + .map(|op| Operation { + query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()), + comment: None, + }); + + Ok(Box::new(operations)) + } + } +} diff --git a/compute_tools/src/sql/add_availabilitycheck_tables.sql b/compute_tools/src/sql/add_availabilitycheck_tables.sql new file mode 100644 index 0000000000..7c60690c78 --- /dev/null +++ b/compute_tools/src/sql/add_availabilitycheck_tables.sql @@ -0,0 +1,18 @@ +DO $$ +BEGIN + IF NOT EXISTS( + SELECT 1 + FROM pg_catalog.pg_tables + WHERE tablename = 'health_check' + ) + THEN + CREATE TABLE health_check ( + id serial primary key, + updated_at timestamptz default now() + ); + INSERT INTO health_check VALUES (1, now()) + ON CONFLICT (id) DO UPDATE + SET updated_at = now(); + END IF; +END +$$ \ No newline at end of file diff --git a/compute_tools/src/sql/anon_ext_fn_reassign.sql b/compute_tools/src/sql/anon_ext_fn_reassign.sql new file mode 100644 index 0000000000..3d7b15c590 --- /dev/null +++ b/compute_tools/src/sql/anon_ext_fn_reassign.sql @@ -0,0 +1,12 @@ +DO $$ +DECLARE + query varchar; +BEGIN + FOR query IN SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {db_owner};' + FROM pg_proc p + JOIN pg_namespace nsp ON p.pronamespace = nsp.oid + WHERE nsp.nspname = 'anon' LOOP + EXECUTE query; + END LOOP; +END +$$; diff --git a/compute_tools/src/sql/default_grants.sql b/compute_tools/src/sql/default_grants.sql new file mode 100644 index 0000000000..58ebb0690b --- /dev/null +++ b/compute_tools/src/sql/default_grants.sql @@ -0,0 +1,30 @@ +DO +$$ + BEGIN + IF EXISTS( + SELECT nspname + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + ) AND + current_setting('server_version_num')::int / 10000 >= 15 + THEN + IF EXISTS( + SELECT rolname + FROM pg_catalog.pg_roles + WHERE rolname = 'web_access' + ) + THEN + GRANT CREATE ON SCHEMA public TO web_access; + END IF; + END IF; + IF EXISTS( + SELECT nspname + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + ) + THEN + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION; + END IF; + END +$$; \ No newline at end of file diff --git a/compute_tools/src/sql/set_public_schema_owner.sql b/compute_tools/src/sql/set_public_schema_owner.sql new file mode 100644 index 0000000000..fd061a713e --- /dev/null +++ b/compute_tools/src/sql/set_public_schema_owner.sql @@ -0,0 +1,23 @@ +DO +$$ + DECLARE + schema_owner TEXT; + BEGIN + IF EXISTS( + SELECT nspname + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + ) + THEN + SELECT nspowner::regrole::text + FROM pg_catalog.pg_namespace + WHERE nspname = 'public' + INTO schema_owner; + + IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin' + THEN + ALTER SCHEMA public OWNER TO {db_owner}; + END IF; + END IF; + END +$$; \ No newline at end of file diff --git a/compute_tools/src/sql/unset_template_for_drop_dbs.sql b/compute_tools/src/sql/unset_template_for_drop_dbs.sql new file mode 100644 index 0000000000..6c4343a589 --- /dev/null +++ b/compute_tools/src/sql/unset_template_for_drop_dbs.sql @@ -0,0 +1,12 @@ +DO $$ + BEGIN + IF EXISTS( + SELECT 1 + FROM pg_catalog.pg_database + WHERE datname = {datname_str} + ) + THEN + ALTER DATABASE {datname} is_template false; + END IF; + END +$$; \ No newline at end of file