mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 14:10:37 +00:00
compute_ctl: Refactor, moving spec_apply functions to spec_apply.rs (#11006)
Seems nice to have the function and all its subroutines in the same source file.
This commit is contained in:
committed by
GitHub
parent
ab1f22b7d1
commit
a4b2009800
@@ -1,5 +1,4 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::iter::once;
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
@@ -13,9 +12,7 @@ use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{
|
||||
ComputeFeature, ComputeMode, ComputeSpec, Database, ExtVersion, PgIdent, Role,
|
||||
};
|
||||
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent};
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
@@ -34,16 +31,6 @@ use utils::measured_stream::MeasuredReader;
|
||||
use crate::installed_extensions::get_installed_extensions;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
use crate::spec_apply::ApplySpecPhase::{
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
|
||||
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
|
||||
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
|
||||
RunInEachDatabase,
|
||||
};
|
||||
use crate::spec_apply::PerDatabasePhase::{
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
|
||||
};
|
||||
use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations};
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
use crate::{config, extension_server, local_proxy};
|
||||
|
||||
@@ -928,388 +915,6 @@ impl ComputeNode {
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
/// Apply the spec to the running PostgreSQL instance.
|
||||
/// The caller can decide to run with multiple clients in parallel, or
|
||||
/// single mode. Either way, the commands executed will be the same, and
|
||||
/// only commands run in different databases are parallelized.
|
||||
#[instrument(skip_all)]
|
||||
pub fn apply_spec_sql(
|
||||
&self,
|
||||
spec: Arc<ComputeSpec>,
|
||||
conf: Arc<tokio_postgres::Config>,
|
||||
concurrency: usize,
|
||||
) -> Result<()> {
|
||||
info!("Applying config with max {} concurrency", concurrency);
|
||||
debug!("Config: {:?}", spec);
|
||||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(async {
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
let client = Self::get_maintenance_client(&conf).await?;
|
||||
let spec = spec.clone();
|
||||
|
||||
let databases = get_existing_dbs_async(&client).await?;
|
||||
let roles = get_existing_roles_async(&client)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|role| (role.name.clone(), role))
|
||||
.collect::<HashMap<String, Role>>();
|
||||
|
||||
// Check if we need to drop subscriptions before starting the endpoint.
|
||||
//
|
||||
// It is important to do this operation exactly once when endpoint starts on a new branch.
|
||||
// Otherwise, we may drop not inherited, but newly created subscriptions.
|
||||
//
|
||||
// We cannot rely only on spec.drop_subscriptions_before_start flag,
|
||||
// because if for some reason compute restarts inside VM,
|
||||
// it will start again with the same spec and flag value.
|
||||
//
|
||||
// To handle this, we save the fact of the operation in the database
|
||||
// in the neon.drop_subscriptions_done table.
|
||||
// If the table does not exist, we assume that the operation was never performed, so we must do it.
|
||||
// If table exists, we check if the operation was performed on the current timelilne.
|
||||
//
|
||||
let mut drop_subscriptions_done = false;
|
||||
|
||||
if spec.drop_subscriptions_before_start {
|
||||
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
|
||||
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
|
||||
|
||||
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
|
||||
|
||||
drop_subscriptions_done = match
|
||||
client.simple_query(&query).await {
|
||||
Ok(result) => {
|
||||
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
|
||||
},
|
||||
Err(e) =>
|
||||
{
|
||||
match e.code() {
|
||||
Some(&SqlState::UNDEFINED_TABLE) => false,
|
||||
_ => {
|
||||
// We don't expect any other error here, except for the schema/table not existing
|
||||
error!("Error checking if drop subscription operation was already performed: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
let jwks_roles = Arc::new(
|
||||
spec.as_ref()
|
||||
.local_proxy_config
|
||||
.iter()
|
||||
.flat_map(|it| &it.jwks)
|
||||
.flatten()
|
||||
.flat_map(|setting| &setting.role_names)
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>(),
|
||||
);
|
||||
|
||||
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
|
||||
roles,
|
||||
dbs: databases,
|
||||
}));
|
||||
|
||||
// Apply special pre drop database phase.
|
||||
// NOTE: we use the code of RunInEachDatabase phase for parallelism
|
||||
// and connection management, but we don't really run it in *each* database,
|
||||
// only in databases, we're about to drop.
|
||||
info!("Applying PerDatabase (pre-dropdb) phase");
|
||||
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
|
||||
|
||||
// Run the phase for each database that we're about to drop.
|
||||
let db_processes = spec
|
||||
.delta_operations
|
||||
.iter()
|
||||
.flatten()
|
||||
.filter_map(move |op| {
|
||||
if op.action.as_str() == "delete_db" {
|
||||
Some(op.name.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(|dbname| {
|
||||
let spec = spec.clone();
|
||||
let ctx = ctx.clone();
|
||||
let jwks_roles = jwks_roles.clone();
|
||||
let mut conf = conf.as_ref().clone();
|
||||
let concurrency_token = concurrency_token.clone();
|
||||
// We only need dbname field for this phase, so set other fields to dummy values
|
||||
let db = DB::UserDB(Database {
|
||||
name: dbname.clone(),
|
||||
owner: "cloud_admin".to_string(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
});
|
||||
|
||||
debug!("Applying per-database phases for Database {:?}", &db);
|
||||
|
||||
match &db {
|
||||
DB::SystemDB => {}
|
||||
DB::UserDB(db) => {
|
||||
conf.dbname(db.name.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let fut = Self::apply_spec_sql_db(
|
||||
spec.clone(),
|
||||
conf,
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
[DropLogicalSubscriptions].to_vec(),
|
||||
);
|
||||
|
||||
Ok(spawn(fut))
|
||||
})
|
||||
.collect::<Vec<Result<_, anyhow::Error>>>();
|
||||
|
||||
for process in db_processes.into_iter() {
|
||||
let handle = process?;
|
||||
if let Err(e) = handle.await? {
|
||||
// Handle the error case where the database does not exist
|
||||
// We do not check whether the DB exists or not in the deletion phase,
|
||||
// so we shouldn't be strict about it in pre-deletion cleanup as well.
|
||||
if e.to_string().contains("does not exist") {
|
||||
warn!("Error dropping subscription: {}", e);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
for phase in [
|
||||
CreateSuperUser,
|
||||
DropInvalidDatabases,
|
||||
RenameRoles,
|
||||
CreateAndAlterRoles,
|
||||
RenameAndDeleteDatabases,
|
||||
CreateAndAlterDatabases,
|
||||
CreateSchemaNeon,
|
||||
] {
|
||||
info!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!("Applying RunInEachDatabase2 phase");
|
||||
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
|
||||
|
||||
let db_processes = spec
|
||||
.cluster
|
||||
.databases
|
||||
.iter()
|
||||
.map(|db| DB::new(db.clone()))
|
||||
// include
|
||||
.chain(once(DB::SystemDB))
|
||||
.map(|db| {
|
||||
let spec = spec.clone();
|
||||
let ctx = ctx.clone();
|
||||
let jwks_roles = jwks_roles.clone();
|
||||
let mut conf = conf.as_ref().clone();
|
||||
let concurrency_token = concurrency_token.clone();
|
||||
let db = db.clone();
|
||||
|
||||
debug!("Applying per-database phases for Database {:?}", &db);
|
||||
|
||||
match &db {
|
||||
DB::SystemDB => {}
|
||||
DB::UserDB(db) => {
|
||||
conf.dbname(db.name.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let mut phases = vec![
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
];
|
||||
|
||||
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
|
||||
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
|
||||
phases.push(DropLogicalSubscriptions);
|
||||
}
|
||||
|
||||
let fut = Self::apply_spec_sql_db(
|
||||
spec.clone(),
|
||||
conf,
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
phases,
|
||||
);
|
||||
|
||||
Ok(spawn(fut))
|
||||
})
|
||||
.collect::<Vec<Result<_, anyhow::Error>>>();
|
||||
|
||||
for process in db_processes.into_iter() {
|
||||
let handle = process?;
|
||||
handle.await??;
|
||||
}
|
||||
|
||||
let mut phases = vec![
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension, // This step depends on CreateSchemaNeon
|
||||
CreateAvailabilityCheck,
|
||||
DropRoles,
|
||||
];
|
||||
|
||||
// This step depends on CreateSchemaNeon
|
||||
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
|
||||
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
|
||||
phases.push(FinalizeDropLogicalSubscriptions);
|
||||
}
|
||||
|
||||
for phase in phases {
|
||||
debug!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply SQL migrations of the RunInEachDatabase phase.
|
||||
///
|
||||
/// May opt to not connect to databases that don't have any scheduled
|
||||
/// operations. The function is concurrency-controlled with the provided
|
||||
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
|
||||
async fn apply_spec_sql_db(
|
||||
spec: Arc<ComputeSpec>,
|
||||
conf: Arc<tokio_postgres::Config>,
|
||||
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
|
||||
jwks_roles: Arc<HashSet<String>>,
|
||||
concurrency_token: Arc<tokio::sync::Semaphore>,
|
||||
db: DB,
|
||||
subphases: Vec<PerDatabasePhase>,
|
||||
) -> Result<()> {
|
||||
let _permit = concurrency_token.acquire().await?;
|
||||
|
||||
let mut client_conn = None;
|
||||
|
||||
for subphase in subphases {
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
RunInEachDatabase {
|
||||
db: db.clone(),
|
||||
subphase,
|
||||
},
|
||||
// Only connect if apply_operation actually wants a connection.
|
||||
// It's quite possible this database doesn't need any queries,
|
||||
// so by not connecting we save time and effort connecting to
|
||||
// that database.
|
||||
|| async {
|
||||
if client_conn.is_none() {
|
||||
let db_client = Self::get_maintenance_client(&conf).await?;
|
||||
client_conn.replace(db_client);
|
||||
}
|
||||
let client = client_conn.as_ref().unwrap();
|
||||
Ok(client)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
drop(client_conn);
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
|
||||
/// Choose how many concurrent connections to use for applying the spec changes.
|
||||
pub fn max_service_connections(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
spec: &ComputeSpec,
|
||||
) -> usize {
|
||||
// If the cluster is in Init state we don't have to deal with user connections,
|
||||
// and can thus use all `max_connections` connection slots. However, that's generally not
|
||||
// very efficient, so we generally still limit it to a smaller number.
|
||||
if compute_state.status == ComputeStatus::Init {
|
||||
// If the settings contain 'max_connections', use that as template
|
||||
if let Some(config) = spec.cluster.settings.find("max_connections") {
|
||||
config.parse::<usize>().ok()
|
||||
} else {
|
||||
// Otherwise, try to find the setting in the postgresql_conf string
|
||||
spec.cluster
|
||||
.postgresql_conf
|
||||
.iter()
|
||||
.flat_map(|conf| conf.split("\n"))
|
||||
.filter_map(|line| {
|
||||
if !line.contains("max_connections") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let (key, value) = line.split_once("=")?;
|
||||
let key = key
|
||||
.trim_start_matches(char::is_whitespace)
|
||||
.trim_end_matches(char::is_whitespace);
|
||||
|
||||
let value = value
|
||||
.trim_start_matches(char::is_whitespace)
|
||||
.trim_end_matches(char::is_whitespace);
|
||||
|
||||
if key != "max_connections" {
|
||||
return None;
|
||||
}
|
||||
|
||||
value.parse::<usize>().ok()
|
||||
})
|
||||
.next()
|
||||
}
|
||||
// If max_connections is present, use at most 1/3rd of that.
|
||||
// When max_connections is lower than 30, try to use at least 10 connections, but
|
||||
// never more than max_connections.
|
||||
.map(|limit| match limit {
|
||||
0..10 => limit,
|
||||
10..30 => 10,
|
||||
30.. => limit / 3,
|
||||
})
|
||||
// If we didn't find max_connections, default to 10 concurrent connections.
|
||||
.unwrap_or(10)
|
||||
} else {
|
||||
// state == Running
|
||||
// Because the cluster is already in the Running state, we should assume users are
|
||||
// already connected to the cluster, and high concurrency could negatively
|
||||
// impact user connectivity. Therefore, we can limit concurrency to the number of
|
||||
// reserved superuser connections, which users wouldn't be able to use anyway.
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("superuser_reserved_connections")
|
||||
.iter()
|
||||
.filter_map(|val| val.parse::<usize>().ok())
|
||||
.map(|val| if val > 1 { val - 1 } else { 1 })
|
||||
.last()
|
||||
.unwrap_or(3)
|
||||
}
|
||||
}
|
||||
|
||||
/// Do initial configuration of the already started Postgres.
|
||||
#[instrument(skip_all)]
|
||||
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
|
||||
@@ -4,15 +4,413 @@ use std::future::Future;
|
||||
use std::iter::{empty, once};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::{Context, Result};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
|
||||
use futures::future::join_all;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_postgres::Client;
|
||||
use tracing::{Instrument, debug, info_span, warn};
|
||||
use tokio_postgres::error::SqlState;
|
||||
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
|
||||
|
||||
use crate::compute::construct_superuser_query;
|
||||
use crate::pg_helpers::{DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal};
|
||||
use crate::compute::{ComputeNode, ComputeState, construct_superuser_query};
|
||||
use crate::pg_helpers::{
|
||||
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async,
|
||||
get_existing_roles_async,
|
||||
};
|
||||
use crate::spec_apply::ApplySpecPhase::{
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
|
||||
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
|
||||
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
|
||||
RunInEachDatabase,
|
||||
};
|
||||
use crate::spec_apply::PerDatabasePhase::{
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
|
||||
};
|
||||
|
||||
impl ComputeNode {
|
||||
/// Apply the spec to the running PostgreSQL instance.
|
||||
/// The caller can decide to run with multiple clients in parallel, or
|
||||
/// single mode. Either way, the commands executed will be the same, and
|
||||
/// only commands run in different databases are parallelized.
|
||||
#[instrument(skip_all)]
|
||||
pub fn apply_spec_sql(
|
||||
&self,
|
||||
spec: Arc<ComputeSpec>,
|
||||
conf: Arc<tokio_postgres::Config>,
|
||||
concurrency: usize,
|
||||
) -> Result<()> {
|
||||
info!("Applying config with max {} concurrency", concurrency);
|
||||
debug!("Config: {:?}", spec);
|
||||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(async {
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
let client = Self::get_maintenance_client(&conf).await?;
|
||||
let spec = spec.clone();
|
||||
|
||||
let databases = get_existing_dbs_async(&client).await?;
|
||||
let roles = get_existing_roles_async(&client)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|role| (role.name.clone(), role))
|
||||
.collect::<HashMap<String, Role>>();
|
||||
|
||||
// Check if we need to drop subscriptions before starting the endpoint.
|
||||
//
|
||||
// It is important to do this operation exactly once when endpoint starts on a new branch.
|
||||
// Otherwise, we may drop not inherited, but newly created subscriptions.
|
||||
//
|
||||
// We cannot rely only on spec.drop_subscriptions_before_start flag,
|
||||
// because if for some reason compute restarts inside VM,
|
||||
// it will start again with the same spec and flag value.
|
||||
//
|
||||
// To handle this, we save the fact of the operation in the database
|
||||
// in the neon.drop_subscriptions_done table.
|
||||
// If the table does not exist, we assume that the operation was never performed, so we must do it.
|
||||
// If table exists, we check if the operation was performed on the current timelilne.
|
||||
//
|
||||
let mut drop_subscriptions_done = false;
|
||||
|
||||
if spec.drop_subscriptions_before_start {
|
||||
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
|
||||
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
|
||||
|
||||
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
|
||||
|
||||
drop_subscriptions_done = match
|
||||
client.simple_query(&query).await {
|
||||
Ok(result) => {
|
||||
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
|
||||
},
|
||||
Err(e) =>
|
||||
{
|
||||
match e.code() {
|
||||
Some(&SqlState::UNDEFINED_TABLE) => false,
|
||||
_ => {
|
||||
// We don't expect any other error here, except for the schema/table not existing
|
||||
error!("Error checking if drop subscription operation was already performed: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
let jwks_roles = Arc::new(
|
||||
spec.as_ref()
|
||||
.local_proxy_config
|
||||
.iter()
|
||||
.flat_map(|it| &it.jwks)
|
||||
.flatten()
|
||||
.flat_map(|setting| &setting.role_names)
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>(),
|
||||
);
|
||||
|
||||
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
|
||||
roles,
|
||||
dbs: databases,
|
||||
}));
|
||||
|
||||
// Apply special pre drop database phase.
|
||||
// NOTE: we use the code of RunInEachDatabase phase for parallelism
|
||||
// and connection management, but we don't really run it in *each* database,
|
||||
// only in databases, we're about to drop.
|
||||
info!("Applying PerDatabase (pre-dropdb) phase");
|
||||
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
|
||||
|
||||
// Run the phase for each database that we're about to drop.
|
||||
let db_processes = spec
|
||||
.delta_operations
|
||||
.iter()
|
||||
.flatten()
|
||||
.filter_map(move |op| {
|
||||
if op.action.as_str() == "delete_db" {
|
||||
Some(op.name.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(|dbname| {
|
||||
let spec = spec.clone();
|
||||
let ctx = ctx.clone();
|
||||
let jwks_roles = jwks_roles.clone();
|
||||
let mut conf = conf.as_ref().clone();
|
||||
let concurrency_token = concurrency_token.clone();
|
||||
// We only need dbname field for this phase, so set other fields to dummy values
|
||||
let db = DB::UserDB(Database {
|
||||
name: dbname.clone(),
|
||||
owner: "cloud_admin".to_string(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
});
|
||||
|
||||
debug!("Applying per-database phases for Database {:?}", &db);
|
||||
|
||||
match &db {
|
||||
DB::SystemDB => {}
|
||||
DB::UserDB(db) => {
|
||||
conf.dbname(db.name.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let fut = Self::apply_spec_sql_db(
|
||||
spec.clone(),
|
||||
conf,
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
[DropLogicalSubscriptions].to_vec(),
|
||||
);
|
||||
|
||||
Ok(tokio::spawn(fut))
|
||||
})
|
||||
.collect::<Vec<Result<_, anyhow::Error>>>();
|
||||
|
||||
for process in db_processes.into_iter() {
|
||||
let handle = process?;
|
||||
if let Err(e) = handle.await? {
|
||||
// Handle the error case where the database does not exist
|
||||
// We do not check whether the DB exists or not in the deletion phase,
|
||||
// so we shouldn't be strict about it in pre-deletion cleanup as well.
|
||||
if e.to_string().contains("does not exist") {
|
||||
warn!("Error dropping subscription: {}", e);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
for phase in [
|
||||
CreateSuperUser,
|
||||
DropInvalidDatabases,
|
||||
RenameRoles,
|
||||
CreateAndAlterRoles,
|
||||
RenameAndDeleteDatabases,
|
||||
CreateAndAlterDatabases,
|
||||
CreateSchemaNeon,
|
||||
] {
|
||||
info!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!("Applying RunInEachDatabase2 phase");
|
||||
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
|
||||
|
||||
let db_processes = spec
|
||||
.cluster
|
||||
.databases
|
||||
.iter()
|
||||
.map(|db| DB::new(db.clone()))
|
||||
// include
|
||||
.chain(once(DB::SystemDB))
|
||||
.map(|db| {
|
||||
let spec = spec.clone();
|
||||
let ctx = ctx.clone();
|
||||
let jwks_roles = jwks_roles.clone();
|
||||
let mut conf = conf.as_ref().clone();
|
||||
let concurrency_token = concurrency_token.clone();
|
||||
let db = db.clone();
|
||||
|
||||
debug!("Applying per-database phases for Database {:?}", &db);
|
||||
|
||||
match &db {
|
||||
DB::SystemDB => {}
|
||||
DB::UserDB(db) => {
|
||||
conf.dbname(db.name.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let mut phases = vec![
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
];
|
||||
|
||||
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
|
||||
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
|
||||
phases.push(DropLogicalSubscriptions);
|
||||
}
|
||||
|
||||
let fut = Self::apply_spec_sql_db(
|
||||
spec.clone(),
|
||||
conf,
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
phases,
|
||||
);
|
||||
|
||||
Ok(tokio::spawn(fut))
|
||||
})
|
||||
.collect::<Vec<Result<_, anyhow::Error>>>();
|
||||
|
||||
for process in db_processes.into_iter() {
|
||||
let handle = process?;
|
||||
handle.await??;
|
||||
}
|
||||
|
||||
let mut phases = vec![
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension, // This step depends on CreateSchemaNeon
|
||||
CreateAvailabilityCheck,
|
||||
DropRoles,
|
||||
];
|
||||
|
||||
// This step depends on CreateSchemaNeon
|
||||
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
|
||||
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
|
||||
phases.push(FinalizeDropLogicalSubscriptions);
|
||||
}
|
||||
|
||||
for phase in phases {
|
||||
debug!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply SQL migrations of the RunInEachDatabase phase.
|
||||
///
|
||||
/// May opt to not connect to databases that don't have any scheduled
|
||||
/// operations. The function is concurrency-controlled with the provided
|
||||
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
|
||||
async fn apply_spec_sql_db(
|
||||
spec: Arc<ComputeSpec>,
|
||||
conf: Arc<tokio_postgres::Config>,
|
||||
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
|
||||
jwks_roles: Arc<HashSet<String>>,
|
||||
concurrency_token: Arc<tokio::sync::Semaphore>,
|
||||
db: DB,
|
||||
subphases: Vec<PerDatabasePhase>,
|
||||
) -> Result<()> {
|
||||
let _permit = concurrency_token.acquire().await?;
|
||||
|
||||
let mut client_conn = None;
|
||||
|
||||
for subphase in subphases {
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
RunInEachDatabase {
|
||||
db: db.clone(),
|
||||
subphase,
|
||||
},
|
||||
// Only connect if apply_operation actually wants a connection.
|
||||
// It's quite possible this database doesn't need any queries,
|
||||
// so by not connecting we save time and effort connecting to
|
||||
// that database.
|
||||
|| async {
|
||||
if client_conn.is_none() {
|
||||
let db_client = Self::get_maintenance_client(&conf).await?;
|
||||
client_conn.replace(db_client);
|
||||
}
|
||||
let client = client_conn.as_ref().unwrap();
|
||||
Ok(client)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
drop(client_conn);
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
|
||||
/// Choose how many concurrent connections to use for applying the spec changes.
|
||||
pub fn max_service_connections(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
spec: &ComputeSpec,
|
||||
) -> usize {
|
||||
// If the cluster is in Init state we don't have to deal with user connections,
|
||||
// and can thus use all `max_connections` connection slots. However, that's generally not
|
||||
// very efficient, so we generally still limit it to a smaller number.
|
||||
if compute_state.status == ComputeStatus::Init {
|
||||
// If the settings contain 'max_connections', use that as template
|
||||
if let Some(config) = spec.cluster.settings.find("max_connections") {
|
||||
config.parse::<usize>().ok()
|
||||
} else {
|
||||
// Otherwise, try to find the setting in the postgresql_conf string
|
||||
spec.cluster
|
||||
.postgresql_conf
|
||||
.iter()
|
||||
.flat_map(|conf| conf.split("\n"))
|
||||
.filter_map(|line| {
|
||||
if !line.contains("max_connections") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let (key, value) = line.split_once("=")?;
|
||||
let key = key
|
||||
.trim_start_matches(char::is_whitespace)
|
||||
.trim_end_matches(char::is_whitespace);
|
||||
|
||||
let value = value
|
||||
.trim_start_matches(char::is_whitespace)
|
||||
.trim_end_matches(char::is_whitespace);
|
||||
|
||||
if key != "max_connections" {
|
||||
return None;
|
||||
}
|
||||
|
||||
value.parse::<usize>().ok()
|
||||
})
|
||||
.next()
|
||||
}
|
||||
// If max_connections is present, use at most 1/3rd of that.
|
||||
// When max_connections is lower than 30, try to use at least 10 connections, but
|
||||
// never more than max_connections.
|
||||
.map(|limit| match limit {
|
||||
0..10 => limit,
|
||||
10..30 => 10,
|
||||
30.. => limit / 3,
|
||||
})
|
||||
// If we didn't find max_connections, default to 10 concurrent connections.
|
||||
.unwrap_or(10)
|
||||
} else {
|
||||
// state == Running
|
||||
// Because the cluster is already in the Running state, we should assume users are
|
||||
// already connected to the cluster, and high concurrency could negatively
|
||||
// impact user connectivity. Therefore, we can limit concurrency to the number of
|
||||
// reserved superuser connections, which users wouldn't be able to use anyway.
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("superuser_reserved_connections")
|
||||
.iter()
|
||||
.filter_map(|val| val.parse::<usize>().ok())
|
||||
.map(|val| if val > 1 { val - 1 } else { 1 })
|
||||
.last()
|
||||
.unwrap_or(3)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum DB {
|
||||
|
||||
Reference in New Issue
Block a user