Merge commit '5b0972151' into problame/standby-horizon-leases

This commit is contained in:
Christian Schwarz
2025-08-06 17:56:07 +02:00
10 changed files with 231 additions and 29 deletions

View File

@@ -138,6 +138,12 @@ struct Cli {
/// Run in development mode, skipping VM-specific operations like process termination
#[arg(long, action = clap::ArgAction::SetTrue)]
pub dev: bool,
#[arg(long)]
pub pg_init_timeout: Option<u64>,
#[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
pub lakebase_mode: bool,
}
impl Cli {
@@ -219,6 +225,8 @@ fn main() -> Result<()> {
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
cli.installed_extensions_collection_interval,
)),
pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
lakebase_mode: cli.lakebase_mode,
},
config,
)?;

View File

@@ -113,6 +113,11 @@ pub struct ComputeNodeParams {
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: Arc<AtomicU64>,
/// Timeout of PG compute startup in the Init state.
pub pg_init_timeout: Option<Duration>,
pub lakebase_mode: bool,
}
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
@@ -156,6 +161,7 @@ pub struct RemoteExtensionMetrics {
#[derive(Clone, Debug)]
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub pg_start_time: Option<DateTime<Utc>>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
@@ -193,6 +199,7 @@ impl ComputeState {
pub fn new() -> Self {
Self {
start_time: Utc::now(),
pg_start_time: None,
status: ComputeStatus::Empty,
last_active: None,
error: None,
@@ -653,6 +660,9 @@ impl ComputeNode {
};
_this_entered = start_compute_span.enter();
// Hadron: Record postgres start time (used to enforce pg_init_timeout).
state_guard.pg_start_time.replace(Utc::now());
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
compute_state = state_guard.clone()
}
@@ -1446,7 +1456,7 @@ impl ComputeNode {
})?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
update_pg_hba(pgdata_path, None)?;
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
@@ -1751,6 +1761,7 @@ impl ComputeNode {
}
// Run migrations separately to not hold up cold starts
let lakebase_mode = self.params.lakebase_mode;
let params = self.params.clone();
tokio::spawn(async move {
let mut conf = conf.as_ref().clone();
@@ -1763,7 +1774,7 @@ impl ComputeNode {
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(params, &mut client).await {
if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await {
error!("Failed to run migrations: {}", e);
}
}

View File

@@ -9,15 +9,20 @@ use crate::metrics::DB_MIGRATION_FAILED;
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
lakebase_mode: bool,
}
impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
Self { client, migrations }
Self {
client,
migrations,
lakebase_mode,
}
}
/// Get the current value neon_migration.migration_id
@@ -130,8 +135,13 @@ impl<'m> MigrationRunner<'m> {
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let migration = if self.lakebase_mode {
migration.replace("neon_superuser", "databricks_superuser")
} else {
migration.to_string()
};
match Self::run_migration(self.client, migration_id, migration).await {
match Self::run_migration(self.client, migration_id, &migration).await {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}

View File

@@ -12,6 +12,7 @@ use utils::lsn::Lsn;
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60);
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
/// Struct to store runtime state of the compute monitor thread.
@@ -394,13 +395,47 @@ impl ComputeMonitor {
// Hang on condition variable waiting until the compute status is `Running`.
fn wait_for_postgres_start(compute: &ComputeNode) {
let mut state = compute.state.lock().unwrap();
let pg_init_timeout = compute
.params
.pg_init_timeout
.unwrap_or(PG_DEFAULT_INIT_TIMEOUIT);
while state.status != ComputeStatus::Running {
info!("compute is not running, waiting before monitoring activity");
state = compute.state_changed.wait(state).unwrap();
if !compute.params.lakebase_mode {
state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::Running {
break;
if state.status == ComputeStatus::Running {
break;
}
continue;
}
if state.pg_start_time.is_some()
&& Utc::now()
.signed_duration_since(state.pg_start_time.unwrap())
.to_std()
.unwrap_or_default()
> pg_init_timeout
{
// If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is
// possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent
// deadends we simply exit (panic) the compute node so it can restart with the latest spec.
//
// NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None).
// This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt
// to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup).
error!(
"compute did not enter Running state in {} seconds, exiting",
pg_init_timeout.as_secs()
);
std::process::exit(1);
}
state = compute
.state_changed
.wait_timeout(state, Duration::from_secs(5))
.unwrap()
.0;
}
}

View File

@@ -11,7 +11,9 @@ use std::time::{Duration, Instant};
use anyhow::{Result, bail};
use compute_api::responses::TlsConfig;
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
use compute_api::spec::{
Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role,
};
use futures::StreamExt;
use indexmap::IndexMap;
use ini::Ini;
@@ -184,6 +186,42 @@ impl DatabaseExt for Database {
}
}
pub trait DatabricksSettingsExt {
fn as_pg_settings(&self) -> String;
}
impl DatabricksSettingsExt for DatabricksSettings {
fn as_pg_settings(&self) -> String {
// Postgres GUCs rendered from DatabricksSettings
vec![
// ssl_ca_file
Some(format!(
"ssl_ca_file = '{}'",
self.pg_compute_tls_settings.ca_file
)),
// [Optional] databricks.workspace_url
Some(format!(
"databricks.workspace_url = '{}'",
&self.databricks_workspace_host
)),
// todo(vikas.jain): these are not required anymore as they are moved to static
// conf but keeping these to avoid image mismatch between hcc and pg.
// Once hcc and pg are in sync, we can remove these.
//
// databricks.enable_databricks_identity_login
Some("databricks.enable_databricks_identity_login = true".to_string()),
// databricks.enable_sql_restrictions
Some("databricks.enable_sql_restrictions = true".to_string()),
]
.into_iter()
// Removes `None`s
.flatten()
.collect::<Vec<String>>()
.join("\n")
+ "\n"
}
}
/// Generic trait used to provide quoting / encoding for strings used in the
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {

View File

@@ -1,4 +1,6 @@
use std::fs::File;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use anyhow::{Result, anyhow, bail};
@@ -133,10 +135,25 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
}
/// Check `pg_hba.conf` and update if needed to allow external connections.
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> {
// XXX: consider making it a part of config.json
let pghba_path = pgdata_path.join("pg_hba.conf");
// Update pg_hba to contains databricks specfic settings before adding neon settings
// PG uses the first record that matches to perform authentication, so we need to have
// our rules before the default ones from neon.
// See https://www.postgresql.org/docs/16/auth-pg-hba-conf.html
if let Some(databricks_pg_hba) = databricks_pg_hba {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_hba),
)? {
info!("updated pg_hba.conf to include databricks_pg_hba.conf");
} else {
info!("pg_hba.conf already included databricks_pg_hba.conf");
}
}
if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
info!("updated pg_hba.conf to allow external connections");
} else {
@@ -146,6 +163,59 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
Ok(())
}
/// Check `pg_ident.conf` and update if needed to allow databricks config.
pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> {
info!("checking pg_ident.conf");
let pghba_path = pgdata_path.join("pg_ident.conf");
// Update pg_ident to contains databricks specfic settings
if let Some(databricks_pg_ident) = databricks_pg_ident {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_ident),
)? {
info!("updated pg_ident.conf to include databricks_pg_ident.conf");
} else {
info!("pg_ident.conf already included databricks_pg_ident.conf");
}
}
Ok(())
}
/// Copy tls key_file and cert_file from k8s secret mount directory
/// to pgdata and set private key file permissions as expected by Postgres.
/// See this doc for expected permission <https://www.postgresql.org/docs/current/ssl-tcp.html>
/// K8s secrets mount on dblet does not honor permission and ownership
/// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions.
pub fn copy_tls_certificates(
key_file: &String,
cert_file: &String,
pgdata_path: &Path,
) -> Result<()> {
let files = [cert_file, key_file];
for file in files.iter() {
let source = Path::new(file);
let dest = pgdata_path.join(source.file_name().unwrap());
if !dest.exists() {
std::fs::copy(source, &dest)?;
info!(
"Copying tls file: {} to {}",
&source.display(),
&dest.display()
);
}
if *file == key_file {
// Postgres requires private key to be readable only by the owner by having
// chmod 600 permissions.
let permissions = Permissions::from_mode(0o600);
fs::set_permissions(&dest, permissions)?;
info!("Setting permission on {}.", &dest.display());
}
}
Ok(())
}
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of config.json
@@ -170,7 +240,11 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
}
#[instrument(skip_all)]
pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> {
pub async fn handle_migrations(
params: ComputeNodeParams,
client: &mut Client,
lakebase_mode: bool,
) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -234,7 +308,7 @@ pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -
),
];
MigrationRunner::new(client, &migrations)
MigrationRunner::new(client, &migrations, lakebase_mode)
.run_migrations()
.await?;

View File

@@ -418,6 +418,32 @@ pub struct GenericOption {
pub vartype: String,
}
/// Postgres compute TLS settings.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct PgComputeTlsSettings {
// Absolute path to the certificate file for server-side TLS.
pub cert_file: String,
// Absolute path to the private key file for server-side TLS.
pub key_file: String,
// Absolute path to the certificate authority file for verifying client certificates.
pub ca_file: String,
}
/// Databricks specific options for compute instance.
/// This is used to store any other settings that needs to be propagate to Compute
/// but should not be persisted to ComputeSpec in the database.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct DatabricksSettings {
pub pg_compute_tls_settings: PgComputeTlsSettings,
// Absolute file path to databricks_pg_hba.conf file.
pub databricks_pg_hba: String,
// Absolute file path to databricks_pg_ident.conf file.
pub databricks_pg_ident: String,
// Hostname portion of the Databricks workspace URL of the endpoint, or empty string if not known.
// A valid hostname is required for the compute instance to support PAT logins.
pub databricks_workspace_host: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;

View File

@@ -362,7 +362,7 @@ impl<T: Types> Cache<T> {
tokio::time::sleep(RETRY_BACKOFF).await;
continue;
} else {
tracing::warn!(
tracing::info!(
"Failed to resolve tenant shard after {} attempts: {:?}",
GET_MAX_RETRIES,
e

View File

@@ -72,22 +72,21 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
(tag).rnode = (rinfo); \
} while (false)
#define BufTagGetNRelFileInfo(tag) tag.rnode
#define BufTagGetNRelFileInfo(tag) (tag).rnode
#define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode)
#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \
#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \
do { \
RelFileNode rnode = { .spcNode = spcOid, .dbNode = dbOid, .relNode = relNumber}; \
(tag).forkNum = forknum; \
(tag).blockNum = blkno; \
(tag).rnode = rnode; \
RelFileNode rnode = { .spcNode = (spc_oid), .dbNode = (db_oid), .relNode = (rel_number)}; \
(tag).forkNum = (fork_number); \
(tag).blockNum = (block_number); \
(tag).rnode = rnode; \
} while (false)
#define InvalidRelFileNumber InvalidOid
#define SMgrRelGetRelInfo(reln) \
(reln->smgr_rnode.node)
#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rnode.node)
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
@@ -133,17 +132,16 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
.relNumber = (tag).relNumber, \
})
#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \
#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \
do { \
(tag).forkNum = forknum; \
(tag).blockNum = blkno; \
(tag).spcOid = spcOid; \
(tag).dbOid = dbOid; \
(tag).relNumber = relNumber; \
(tag).forkNum = (fork_number); \
(tag).blockNum = (block_number); \
(tag).spcOid = (spc_oid); \
(tag).dbOid = (db_oid); \
(tag).relNumber = (rel_number); \
} while (false)
#define SMgrRelGetRelInfo(reln) \
((reln)->smgr_rlocator)
#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rlocator)
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
#endif

View File

@@ -152,6 +152,8 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*reconciler.*neon_local error.*",
# Tenant rate limits may fire in tests that submit lots of API requests.
".*tenant \\S+ is rate limited.*",
# Reconciliations may get stuck/delayed e.g. in chaos tests.
".*background_reconcile: Shard reconciliation is stuck.*",
]