mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
2 Commits
myrrc/clos
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
77fb3a24bf | ||
|
|
a9c38ad137 |
@@ -1500,24 +1500,9 @@ impl ComputeNode {
|
||||
let mut conf = conf.as_ref().clone();
|
||||
conf.application_name("compute_ctl:migrations");
|
||||
|
||||
match conf.connect(NoTls).await {
|
||||
Ok((mut client, connection)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
if let Err(e) = handle_migrations(&mut client).await {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to connect to the compute for running migrations: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
if let Err(e) = handle_migrations(conf).await {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
|
||||
@@ -1,29 +1,60 @@
|
||||
use anyhow::{Context, Result};
|
||||
use fail::fail_point;
|
||||
use tokio_postgres::{Client, Transaction};
|
||||
use tracing::{error, info};
|
||||
use postgres::NoTls;
|
||||
use tokio_postgres::{Client, Config, Transaction};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::metrics::DB_MIGRATION_FAILED;
|
||||
|
||||
/// Runs a series of migrations on a target database
|
||||
use compute_api::spec::{Database, PgIdent};
|
||||
|
||||
use crate::pg_helpers::{Escaping, get_existing_dbs_async};
|
||||
|
||||
pub(crate) enum Migration<'m> {
|
||||
/// Cluster migrations are things like catalog updates, where they can be
|
||||
/// run in the default Postgres database, but affect every database in the
|
||||
/// cluster.
|
||||
Cluster(&'m str),
|
||||
|
||||
/// Per-database migrations will be run in every database of the cluster.
|
||||
/// The migration will not be marked as completed until after it has been
|
||||
/// run in every database. We will save the `postgres` database for last so
|
||||
/// that we can commit the transaction as applied in the
|
||||
/// neon_migration.migration_id table.
|
||||
///
|
||||
/// Please be aware of the race condition that exists for this type of
|
||||
/// migration. At the beginning of running the series of migrations, we get
|
||||
/// the current list of databases. However, we run migrations in a separate
|
||||
/// thread in order to not block connections to the compute. If after the
|
||||
/// time we have gotten the list of databases in the cluster, a user creates
|
||||
/// a new database, that database will not receive the migration, but we
|
||||
/// will have marked the migration as completed successfully, assuming all
|
||||
/// previous databases ran the migration to completion.
|
||||
PerDatabase(&'m str),
|
||||
}
|
||||
|
||||
pub(crate) struct MigrationRunner<'m> {
|
||||
client: &'m mut Client,
|
||||
migrations: &'m [&'m str],
|
||||
/// Postgres client configuration.
|
||||
config: Config,
|
||||
|
||||
/// List of migrations to run.
|
||||
migrations: &'m [Migration<'m>],
|
||||
}
|
||||
|
||||
impl<'m> MigrationRunner<'m> {
|
||||
/// Create a new migration runner
|
||||
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
|
||||
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
|
||||
assert!(migrations.len() + 1 < i64::MAX as usize);
|
||||
pub fn new(config: Config, migrations: &'m [Migration<'m>]) -> Result<Self> {
|
||||
// The neon_migration.migration_id::id column is a bigint, which is
|
||||
// equivalent to an i64
|
||||
debug_assert!(migrations.len() + 1 < i64::MAX as usize);
|
||||
|
||||
Self { client, migrations }
|
||||
Ok(Self { config, migrations })
|
||||
}
|
||||
|
||||
/// Get the current value neon_migration.migration_id
|
||||
async fn get_migration_id(&mut self) -> Result<i64> {
|
||||
let row = self
|
||||
.client
|
||||
async fn get_migration_id(client: &mut Client) -> Result<i64> {
|
||||
let row = client
|
||||
.query_one("SELECT id FROM neon_migration.migration_id", &[])
|
||||
.await?;
|
||||
|
||||
@@ -36,9 +67,8 @@ impl<'m> MigrationRunner<'m> {
|
||||
/// used if you would like to fail the application of a series of migrations
|
||||
/// at some point.
|
||||
async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
|
||||
// We use this fail point in order to check that failing in the
|
||||
// middle of applying a series of migrations fails in an expected
|
||||
// manner
|
||||
// We use this fail point in order to check that failing in the middle
|
||||
// of applying a series of migrations fails in an expected manner
|
||||
if cfg!(feature = "testing") {
|
||||
let fail = (|| {
|
||||
fail_point!("compute-migration", |fail_migration_id| {
|
||||
@@ -67,48 +97,95 @@ impl<'m> MigrationRunner<'m> {
|
||||
}
|
||||
|
||||
/// Prepare the migrations the target database for handling migrations
|
||||
async fn prepare_database(&mut self) -> Result<()> {
|
||||
self.client
|
||||
async fn prepare_database(client: &mut Client) -> Result<()> {
|
||||
client
|
||||
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
|
||||
.await?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
|
||||
self.client
|
||||
client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
|
||||
client
|
||||
.simple_query(
|
||||
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.await?;
|
||||
self.client
|
||||
client
|
||||
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
|
||||
.await?;
|
||||
self.client
|
||||
client
|
||||
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run an individual migration in a separate transaction block.
|
||||
async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
|
||||
/// Helper function for allowing/disallowing connections to a Postgres
|
||||
/// database.
|
||||
async fn allow_connections_to_db(
|
||||
client: &mut Client,
|
||||
dbname: &PgIdent,
|
||||
allow: bool,
|
||||
) -> Result<()> {
|
||||
client
|
||||
.simple_query(
|
||||
format!(
|
||||
"ALTER DATABASE {} WITH ALLOW_CONNECTIONS {}",
|
||||
dbname.pg_quote(),
|
||||
allow
|
||||
)
|
||||
.as_str(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Connect to the configured Postgres database. Spawns a tokio task to
|
||||
/// handle the connection.
|
||||
async fn connect(config: &Config) -> Result<Client> {
|
||||
let (client, connection) = config.connect(NoTls).await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn run_migration(
|
||||
client: &mut Client,
|
||||
db: &str,
|
||||
migration_id: i64,
|
||||
migration: &str,
|
||||
update_migration_id: bool,
|
||||
) -> Result<()> {
|
||||
let mut txn = client
|
||||
.transaction()
|
||||
.await
|
||||
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
|
||||
|
||||
if migration.starts_with("-- SKIP") {
|
||||
info!("Skipping migration id={}", migration_id);
|
||||
|
||||
// Even though we are skipping the migration, updating the
|
||||
// migration ID should help keep logic easy to understand when
|
||||
// trying to understand the state of a cluster.
|
||||
Self::update_migration_id(&mut txn, migration_id).await?;
|
||||
info!("Skipping migration id={} db=\"{}\"", migration_id, db);
|
||||
} else {
|
||||
info!("Running migration id={}:\n{}\n", migration_id, migration);
|
||||
info!(
|
||||
"Running migration id={} db=\"{}\":\n{}\n",
|
||||
migration_id, db, migration
|
||||
);
|
||||
|
||||
txn.simple_query(migration)
|
||||
.await
|
||||
.with_context(|| format!("apply migration {migration_id}"))?;
|
||||
if let Err(e) = txn.simple_query(migration).await {
|
||||
error!("Failed to run the migration: {}", e);
|
||||
return Err(anyhow::anyhow!(e));
|
||||
}
|
||||
}
|
||||
|
||||
Self::update_migration_id(&mut txn, migration_id).await?;
|
||||
if update_migration_id {
|
||||
if let Err(e) = Self::update_migration_id(&mut txn, migration_id).await {
|
||||
error!(
|
||||
"Failed to update the migration id to {}: {}",
|
||||
migration_id, e
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
txn.commit()
|
||||
@@ -118,31 +195,200 @@ impl<'m> MigrationRunner<'m> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the configured set of migrations
|
||||
pub async fn run_migrations(mut self) -> Result<()> {
|
||||
self.prepare_database()
|
||||
/// Run the migration for the entire cluster. See [`Migration::Cluster`] for
|
||||
/// more information.
|
||||
async fn run_cluster_migration(
|
||||
client: &mut Client,
|
||||
db: &str,
|
||||
migration_id: i64,
|
||||
migration: &str,
|
||||
) -> Result<()> {
|
||||
Self::run_migration(client, db, migration_id, migration, true).await
|
||||
}
|
||||
|
||||
/// Run the migration in the specified database. See
|
||||
/// [`Migration::PerDatabase`] for more information.
|
||||
async fn run_database_migration(
|
||||
cluster_client: &mut Client,
|
||||
config: Config,
|
||||
db: &Database,
|
||||
migration_id: i64,
|
||||
migration: &str,
|
||||
) -> Result<()> {
|
||||
// There are 2 race conditions here. Migrations get ran in a separate
|
||||
// thread to not block the ability to connect to the compute. The race
|
||||
// conditions are as follow:
|
||||
//
|
||||
// 1. If between the time we have retrieved the list of databases in
|
||||
// the cluster and before we set ALLOW_CONNECTIONS back to false,
|
||||
// the user has changed allowed connections to the database, we
|
||||
// will have overwritten their change.
|
||||
//
|
||||
// This is not the end of the world, but an inconvenience,
|
||||
// nonetheless.
|
||||
//
|
||||
// 2. If between the time we have allowed connections to the database
|
||||
// and the time the migration is performed, the user disallows
|
||||
// connections to the database, we will fail to connect to the
|
||||
// database.
|
||||
//
|
||||
// This is not much of a problem since we will re-run the migration
|
||||
// the next time we run migrations.
|
||||
if db.restrict_conn {
|
||||
info!("Allowing connections to \"{}\" for migrations", db.name);
|
||||
|
||||
Self::allow_connections_to_db(cluster_client, &db.name, true)
|
||||
.await
|
||||
.context("Failed to allow connections to the database")?;
|
||||
}
|
||||
|
||||
let mut db_client = Self::connect(&config)
|
||||
.await
|
||||
.context("prepare database to handle migrations")?;
|
||||
.context("Failed to connect to the database")?;
|
||||
|
||||
let mut current_migration = self.get_migration_id().await? as usize;
|
||||
while current_migration < self.migrations.len() {
|
||||
// The index lags the migration ID by 1, so the current migration
|
||||
// ID is also the next index
|
||||
let migration_id = (current_migration + 1) as i64;
|
||||
let migration = self.migrations[current_migration];
|
||||
let result = Self::run_migration(&mut db_client, &db.name, migration_id, migration, false)
|
||||
.await
|
||||
.context("Failed to run the migration");
|
||||
|
||||
match Self::run_migration(self.client, migration_id, migration).await {
|
||||
Ok(_) => {
|
||||
info!("Finished migration id={}", migration_id);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to run migration id={}: {:?}", migration_id, e);
|
||||
DB_MIGRATION_FAILED
|
||||
.with_label_values(&[migration_id.to_string().as_str()])
|
||||
.inc();
|
||||
return Err(e);
|
||||
}
|
||||
// Reset the connection restriction
|
||||
if db.restrict_conn {
|
||||
info!(
|
||||
"Disallowing connections to \"{}\" because migration {} is done",
|
||||
db.name, migration_id
|
||||
);
|
||||
|
||||
// Failing here is not the end of the world
|
||||
if let Err(e) = Self::allow_connections_to_db(cluster_client, &db.name, false).await {
|
||||
warn!(
|
||||
"failed to reset ALLOW_CONNECTIONS on \"{}\": {}",
|
||||
db.name, e
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Run the configured set of migrations.
|
||||
pub async fn run_migrations(self) -> Result<()> {
|
||||
// Owns the connection to the database containing the
|
||||
// neon_migration.migration_id table. In addition, all Cluster
|
||||
// migrations will be run on this connection.
|
||||
let mut cluster_client = Self::connect(&self.config)
|
||||
.await
|
||||
.context("failed to connect to cluster")?;
|
||||
|
||||
Self::prepare_database(&mut cluster_client)
|
||||
.await
|
||||
.context("failed to prepare database to handle migrations")?;
|
||||
|
||||
let mut current_migration = Self::get_migration_id(&mut cluster_client)
|
||||
.await
|
||||
.context("failed to get the current migration ID")?
|
||||
as usize;
|
||||
|
||||
// All databases within the cluster
|
||||
let dbs: Option<Vec<Database>> = {
|
||||
// Then check if we actually need to run any, and if so, check if
|
||||
// any need to run in each individual database
|
||||
if current_migration < self.migrations.len()
|
||||
&& self.migrations[current_migration..]
|
||||
.iter()
|
||||
.any(|m| matches!(m, Migration::PerDatabase(_)))
|
||||
{
|
||||
match get_existing_dbs_async(&cluster_client).await {
|
||||
Ok(dbs) => Some(
|
||||
// Filter out invalid database (datconnectivity = -2)
|
||||
dbs.into_values().filter(|d| !d.invalid).collect::<Vec<_>>(),
|
||||
),
|
||||
Err(e) => {
|
||||
error!("Failed to collect the existing databases: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None::<Vec<_>>
|
||||
}
|
||||
};
|
||||
|
||||
let admin_db = self.config.get_dbname().unwrap();
|
||||
|
||||
while current_migration < self.migrations.len() {
|
||||
let migration_id = (current_migration + 1) as i64;
|
||||
|
||||
let result: Result<()> = match &self.migrations[current_migration] {
|
||||
Migration::Cluster(migration) => {
|
||||
Self::run_cluster_migration(
|
||||
&mut cluster_client,
|
||||
admin_db,
|
||||
migration_id,
|
||||
migration,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Migration::PerDatabase(migration) => {
|
||||
let mut result: Result<()> = Ok(());
|
||||
for db in dbs.as_ref().unwrap() {
|
||||
// Once all the databases have run the migration, then we can run it in the
|
||||
// admin database to mark the migration as complete. See the run for the
|
||||
// admin database outside this loop.
|
||||
if db.name == admin_db {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut config = self.config.clone();
|
||||
config.dbname(&db.name);
|
||||
|
||||
// If we failed to run the migration in the current
|
||||
// database, stop trying to run this migration
|
||||
if let Err(e) = Self::run_database_migration(
|
||||
&mut cluster_client,
|
||||
config,
|
||||
db,
|
||||
migration_id,
|
||||
migration,
|
||||
)
|
||||
.await
|
||||
{
|
||||
result = Err(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
// Finally, run the migration for the admin database,
|
||||
// and update the migration ID
|
||||
Self::run_migration(
|
||||
&mut cluster_client,
|
||||
admin_db,
|
||||
migration_id,
|
||||
migration,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("failed to commit the per-database migration: {}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// If failed, mark the metric and return
|
||||
if let Err(e) = result {
|
||||
DB_MIGRATION_FAILED
|
||||
.with_label_values(&[migration_id.to_string().as_str()])
|
||||
.inc();
|
||||
|
||||
return Err(anyhow::anyhow!(format!(
|
||||
"failed at migration {migration_id}: {e}"
|
||||
)));
|
||||
}
|
||||
|
||||
info!("Finished migration id={}", migration_id);
|
||||
|
||||
current_migration += 1;
|
||||
}
|
||||
|
||||
235
compute_tools/src/migrations/0012-fix-CVE-2024-4317.sql
Normal file
235
compute_tools/src/migrations/0012-fix-CVE-2024-4317.sql
Normal file
@@ -0,0 +1,235 @@
|
||||
/*
|
||||
* fix-CVE-2024-4317.sql
|
||||
*
|
||||
* Copyright (c) 2024, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/backend/catalog/fix-CVE-2024-4317.sql
|
||||
*
|
||||
* This file should be run in every database in the cluster to address
|
||||
* CVE-2024-4317.
|
||||
*/
|
||||
|
||||
DO $$
|
||||
DECLARE
|
||||
server_version_num numeric;
|
||||
BEGIN
|
||||
SET search_path = pg_catalog;
|
||||
|
||||
SELECT setting::numeric FROM pg_settings INTO server_version_num WHERE name = 'server_version_num';
|
||||
|
||||
-- Everything after Postgres 17 will have the fix
|
||||
IF server_version_num >= 170000 THEN
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
-- pg_statistic_ext_data doesn't have the stxdinherit column in 14 and below
|
||||
IF server_version_num < 150000 THEN
|
||||
CREATE OR REPLACE VIEW pg_stats_ext WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
( SELECT array_agg(a.attname ORDER BY a.attnum)
|
||||
FROM unnest(s.stxkeys) k
|
||||
JOIN pg_attribute a
|
||||
ON (a.attrelid = s.stxrelid AND a.attnum = k)
|
||||
) AS attnames,
|
||||
pg_get_statisticsobjdef_expressions(s.oid) as exprs,
|
||||
s.stxkind AS kinds,
|
||||
sd.stxdndistinct AS n_distinct,
|
||||
sd.stxddependencies AS dependencies,
|
||||
m.most_common_vals,
|
||||
m.most_common_val_nulls,
|
||||
m.most_common_freqs,
|
||||
m.most_common_base_freqs
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
LEFT JOIN LATERAL
|
||||
( SELECT array_agg(values) AS most_common_vals,
|
||||
array_agg(nulls) AS most_common_val_nulls,
|
||||
array_agg(frequency) AS most_common_freqs,
|
||||
array_agg(base_frequency) AS most_common_base_freqs
|
||||
FROM pg_mcv_list_items(sd.stxdmcv)
|
||||
) m ON sd.stxdmcv IS NOT NULL
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
|
||||
CREATE OR REPLACE VIEW pg_stats_ext_exprs WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
stat.expr,
|
||||
(stat.a).stanullfrac AS null_frac,
|
||||
(stat.a).stawidth AS avg_width,
|
||||
(stat.a).stadistinct AS n_distinct,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stavalues5
|
||||
END) AS most_common_vals,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 2 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 2 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 2 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 2 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 2 THEN (stat.a).stavalues5
|
||||
END) AS histogram_bounds,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 3 THEN (stat.a).stanumbers1[1]
|
||||
WHEN (stat.a).stakind2 = 3 THEN (stat.a).stanumbers2[1]
|
||||
WHEN (stat.a).stakind3 = 3 THEN (stat.a).stanumbers3[1]
|
||||
WHEN (stat.a).stakind4 = 3 THEN (stat.a).stanumbers4[1]
|
||||
WHEN (stat.a).stakind5 = 3 THEN (stat.a).stanumbers5[1]
|
||||
END) correlation,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stavalues5
|
||||
END) AS most_common_elems,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_elem_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 5 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 5 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 5 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 5 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 5 THEN (stat.a).stanumbers5
|
||||
END) AS elem_count_histogram
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
LEFT JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
JOIN LATERAL (
|
||||
SELECT unnest(pg_get_statisticsobjdef_expressions(s.oid)) AS expr,
|
||||
unnest(sd.stxdexpr)::pg_statistic AS a
|
||||
) stat ON (stat.expr IS NOT NULL)
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
ELSE
|
||||
CREATE OR REPLACE VIEW pg_stats_ext WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
( SELECT array_agg(a.attname ORDER BY a.attnum)
|
||||
FROM unnest(s.stxkeys) k
|
||||
JOIN pg_attribute a
|
||||
ON (a.attrelid = s.stxrelid AND a.attnum = k)
|
||||
) AS attnames,
|
||||
pg_get_statisticsobjdef_expressions(s.oid) as exprs,
|
||||
s.stxkind AS kinds,
|
||||
sd.stxdinherit AS inherited,
|
||||
sd.stxdndistinct AS n_distinct,
|
||||
sd.stxddependencies AS dependencies,
|
||||
m.most_common_vals,
|
||||
m.most_common_val_nulls,
|
||||
m.most_common_freqs,
|
||||
m.most_common_base_freqs
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
LEFT JOIN LATERAL
|
||||
( SELECT array_agg(values) AS most_common_vals,
|
||||
array_agg(nulls) AS most_common_val_nulls,
|
||||
array_agg(frequency) AS most_common_freqs,
|
||||
array_agg(base_frequency) AS most_common_base_freqs
|
||||
FROM pg_mcv_list_items(sd.stxdmcv)
|
||||
) m ON sd.stxdmcv IS NOT NULL
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
|
||||
CREATE OR REPLACE VIEW pg_stats_ext_exprs WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
stat.expr,
|
||||
sd.stxdinherit AS inherited,
|
||||
(stat.a).stanullfrac AS null_frac,
|
||||
(stat.a).stawidth AS avg_width,
|
||||
(stat.a).stadistinct AS n_distinct,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stavalues5
|
||||
END) AS most_common_vals,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 2 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 2 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 2 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 2 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 2 THEN (stat.a).stavalues5
|
||||
END) AS histogram_bounds,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 3 THEN (stat.a).stanumbers1[1]
|
||||
WHEN (stat.a).stakind2 = 3 THEN (stat.a).stanumbers2[1]
|
||||
WHEN (stat.a).stakind3 = 3 THEN (stat.a).stanumbers3[1]
|
||||
WHEN (stat.a).stakind4 = 3 THEN (stat.a).stanumbers4[1]
|
||||
WHEN (stat.a).stakind5 = 3 THEN (stat.a).stanumbers5[1]
|
||||
END) correlation,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stavalues5
|
||||
END) AS most_common_elems,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_elem_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 5 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 5 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 5 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 5 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 5 THEN (stat.a).stanumbers5
|
||||
END) AS elem_count_histogram
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
LEFT JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
JOIN LATERAL (
|
||||
SELECT unnest(pg_get_statisticsobjdef_expressions(s.oid)) AS expr,
|
||||
unnest(sd.stxdexpr)::pg_statistic AS a
|
||||
) stat ON (stat.expr IS NOT NULL)
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -0,0 +1,5 @@
|
||||
-- Testing that this migration actually works would require spinning up a
|
||||
-- Postgres instance running on a vulnerable version. Let's trust that the
|
||||
-- Postgres community created a SQL fix that actually works.
|
||||
|
||||
SELECT 1;
|
||||
@@ -6,12 +6,12 @@ use compute_api::responses::{
|
||||
ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
use tokio_postgres::Client;
|
||||
use tokio_postgres::{Client, Config};
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use crate::config;
|
||||
use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
|
||||
use crate::migration::MigrationRunner;
|
||||
use crate::migration::{Migration, MigrationRunner};
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
|
||||
// Do control plane request and return response if any. In case of error it
|
||||
@@ -169,7 +169,7 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
pub async fn handle_migrations(config: Config) -> Result<()> {
|
||||
info!("handle migrations");
|
||||
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
@@ -178,30 +178,50 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
|
||||
// Add new migrations in numerical order.
|
||||
let migrations = [
|
||||
include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
|
||||
include_str!("./migrations/0002-alter_roles.sql"),
|
||||
include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
|
||||
include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
|
||||
include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
|
||||
include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
|
||||
include_str!(
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0001-neon_superuser_bypass_rls.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!("./migrations/0002-alter_roles.sql")),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
|
||||
),
|
||||
include_str!(
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
|
||||
),
|
||||
include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
|
||||
include_str!(
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
|
||||
),
|
||||
include_str!(
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
|
||||
),
|
||||
)),
|
||||
Migration::PerDatabase(include_str!("./migrations/0012-fix-CVE-2024-4317.sql")),
|
||||
];
|
||||
|
||||
MigrationRunner::new(client, &migrations)
|
||||
.run_migrations()
|
||||
.await?;
|
||||
let runner = match MigrationRunner::new(config, &migrations) {
|
||||
Ok(runner) => runner,
|
||||
Err(e) => {
|
||||
error!("Failed to construct a migration runner: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
runner.run_migrations().await.map_err(|e| {
|
||||
error!("Failed to run the migrations: {}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
@@ -75,6 +75,7 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d
|
||||
pattern = rf"Running migration id={i}"
|
||||
|
||||
endpoint.log_contains(pattern)
|
||||
endpoint.log_contains(rf"Finished migration id={i}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
Reference in New Issue
Block a user