From 8de320ab9bf3c458682d6bdf70f3b61693735479 Mon Sep 17 00:00:00 2001 From: HaoyuHuang Date: Tue, 22 Jul 2025 09:22:18 -0700 Subject: [PATCH 1/4] Add a few compute_tool changes (#12677) ## Summary of changes All changes are no-op. --- compute_tools/src/bin/compute_ctl.rs | 8 +++ compute_tools/src/compute.rs | 15 +++++- compute_tools/src/migration.rs | 16 ++++-- compute_tools/src/monitor.rs | 41 ++++++++++++-- compute_tools/src/pg_helpers.rs | 40 +++++++++++++- compute_tools/src/spec.rs | 80 ++++++++++++++++++++++++++-- libs/compute_api/src/spec.rs | 26 +++++++++ 7 files changed, 214 insertions(+), 12 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 3d07a2ece8..04723d6f3d 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -138,6 +138,12 @@ struct Cli { /// Run in development mode, skipping VM-specific operations like process termination #[arg(long, action = clap::ArgAction::SetTrue)] pub dev: bool, + + #[arg(long)] + pub pg_init_timeout: Option, + + #[arg(long, default_value_t = false, action = clap::ArgAction::Set)] + pub lakebase_mode: bool, } impl Cli { @@ -219,6 +225,8 @@ fn main() -> Result<()> { installed_extensions_collection_interval: Arc::new(AtomicU64::new( cli.installed_extensions_collection_interval, )), + pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs), + lakebase_mode: cli.lakebase_mode, }, config, )?; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 3ae946c10e..b4d7a6fca9 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -113,6 +113,11 @@ pub struct ComputeNodeParams { /// Interval for installed extensions collection pub installed_extensions_collection_interval: Arc, + + /// Timeout of PG compute startup in the Init state. + pub pg_init_timeout: Option, + + pub lakebase_mode: bool, } type TaskHandle = Mutex>>; @@ -154,6 +159,7 @@ pub struct RemoteExtensionMetrics { #[derive(Clone, Debug)] pub struct ComputeState { pub start_time: DateTime, + pub pg_start_time: Option>, pub status: ComputeStatus, /// Timestamp of the last Postgres activity. It could be `None` if /// compute wasn't used since start. @@ -191,6 +197,7 @@ impl ComputeState { pub fn new() -> Self { Self { start_time: Utc::now(), + pg_start_time: None, status: ComputeStatus::Empty, last_active: None, error: None, @@ -648,6 +655,9 @@ impl ComputeNode { }; _this_entered = start_compute_span.enter(); + // Hadron: Record postgres start time (used to enforce pg_init_timeout). + state_guard.pg_start_time.replace(Utc::now()); + state_guard.set_status(ComputeStatus::Init, &self.state_changed); compute_state = state_guard.clone() } @@ -1441,7 +1451,7 @@ impl ComputeNode { })?; // Update pg_hba.conf received with basebackup. - update_pg_hba(pgdata_path)?; + update_pg_hba(pgdata_path, None)?; // Place pg_dynshmem under /dev/shm. This allows us to use // 'dynamic_shared_memory_type = mmap' so that the files are placed in @@ -1746,6 +1756,7 @@ impl ComputeNode { } // Run migrations separately to not hold up cold starts + let lakebase_mode = self.params.lakebase_mode; let params = self.params.clone(); tokio::spawn(async move { let mut conf = conf.as_ref().clone(); @@ -1758,7 +1769,7 @@ impl ComputeNode { eprintln!("connection error: {e}"); } }); - if let Err(e) = handle_migrations(params, &mut client).await { + if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await { error!("Failed to run migrations: {}", e); } } diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs index c5e05822c0..88d870df97 100644 --- a/compute_tools/src/migration.rs +++ b/compute_tools/src/migration.rs @@ -9,15 +9,20 @@ use crate::metrics::DB_MIGRATION_FAILED; pub(crate) struct MigrationRunner<'m> { client: &'m mut Client, migrations: &'m [&'m str], + lakebase_mode: bool, } impl<'m> MigrationRunner<'m> { /// Create a new migration runner - pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self { + pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self { // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64 assert!(migrations.len() + 1 < i64::MAX as usize); - Self { client, migrations } + Self { + client, + migrations, + lakebase_mode, + } } /// Get the current value neon_migration.migration_id @@ -130,8 +135,13 @@ impl<'m> MigrationRunner<'m> { // ID is also the next index let migration_id = (current_migration + 1) as i64; let migration = self.migrations[current_migration]; + let migration = if self.lakebase_mode { + migration.replace("neon_superuser", "databricks_superuser") + } else { + migration.to_string() + }; - match Self::run_migration(self.client, migration_id, migration).await { + match Self::run_migration(self.client, migration_id, &migration).await { Ok(_) => { info!("Finished migration id={}", migration_id); } diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index fa01545856..e164f15dba 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -11,6 +11,7 @@ use tracing::{Level, error, info, instrument, span}; use crate::compute::ComputeNode; use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS}; +const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60); const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500); /// Struct to store runtime state of the compute monitor thread. @@ -352,13 +353,47 @@ impl ComputeMonitor { // Hang on condition variable waiting until the compute status is `Running`. fn wait_for_postgres_start(compute: &ComputeNode) { let mut state = compute.state.lock().unwrap(); + let pg_init_timeout = compute + .params + .pg_init_timeout + .unwrap_or(PG_DEFAULT_INIT_TIMEOUIT); + while state.status != ComputeStatus::Running { info!("compute is not running, waiting before monitoring activity"); - state = compute.state_changed.wait(state).unwrap(); + if !compute.params.lakebase_mode { + state = compute.state_changed.wait(state).unwrap(); - if state.status == ComputeStatus::Running { - break; + if state.status == ComputeStatus::Running { + break; + } + continue; } + + if state.pg_start_time.is_some() + && Utc::now() + .signed_duration_since(state.pg_start_time.unwrap()) + .to_std() + .unwrap_or_default() + > pg_init_timeout + { + // If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is + // possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent + // deadends we simply exit (panic) the compute node so it can restart with the latest spec. + // + // NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None). + // This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt + // to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup). + error!( + "compute did not enter Running state in {} seconds, exiting", + pg_init_timeout.as_secs() + ); + std::process::exit(1); + } + state = compute + .state_changed + .wait_timeout(state, Duration::from_secs(5)) + .unwrap() + .0; } } diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 0a3ceed2fa..09bbe89b41 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -11,7 +11,9 @@ use std::time::{Duration, Instant}; use anyhow::{Result, bail}; use compute_api::responses::TlsConfig; -use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role}; +use compute_api::spec::{ + Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role, +}; use futures::StreamExt; use indexmap::IndexMap; use ini::Ini; @@ -184,6 +186,42 @@ impl DatabaseExt for Database { } } +pub trait DatabricksSettingsExt { + fn as_pg_settings(&self) -> String; +} + +impl DatabricksSettingsExt for DatabricksSettings { + fn as_pg_settings(&self) -> String { + // Postgres GUCs rendered from DatabricksSettings + vec![ + // ssl_ca_file + Some(format!( + "ssl_ca_file = '{}'", + self.pg_compute_tls_settings.ca_file + )), + // [Optional] databricks.workspace_url + Some(format!( + "databricks.workspace_url = '{}'", + &self.databricks_workspace_host + )), + // todo(vikas.jain): these are not required anymore as they are moved to static + // conf but keeping these to avoid image mismatch between hcc and pg. + // Once hcc and pg are in sync, we can remove these. + // + // databricks.enable_databricks_identity_login + Some("databricks.enable_databricks_identity_login = true".to_string()), + // databricks.enable_sql_restrictions + Some("databricks.enable_sql_restrictions = true".to_string()), + ] + .into_iter() + // Removes `None`s + .flatten() + .collect::>() + .join("\n") + + "\n" + } +} + /// Generic trait used to provide quoting / encoding for strings used in the /// Postgres SQL queries and DATABASE_URL. pub trait Escaping { diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 4525a0e831..d00f86a2c0 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,4 +1,6 @@ use std::fs::File; +use std::fs::{self, Permissions}; +use std::os::unix::fs::PermissionsExt; use std::path::Path; use anyhow::{Result, anyhow, bail}; @@ -133,10 +135,25 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result } /// Check `pg_hba.conf` and update if needed to allow external connections. -pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> { +pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> { // XXX: consider making it a part of config.json let pghba_path = pgdata_path.join("pg_hba.conf"); + // Update pg_hba to contains databricks specfic settings before adding neon settings + // PG uses the first record that matches to perform authentication, so we need to have + // our rules before the default ones from neon. + // See https://www.postgresql.org/docs/16/auth-pg-hba-conf.html + if let Some(databricks_pg_hba) = databricks_pg_hba { + if config::line_in_file( + &pghba_path, + &format!("include_if_exists {}\n", *databricks_pg_hba), + )? { + info!("updated pg_hba.conf to include databricks_pg_hba.conf"); + } else { + info!("pg_hba.conf already included databricks_pg_hba.conf"); + } + } + if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? { info!("updated pg_hba.conf to allow external connections"); } else { @@ -146,6 +163,59 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> { Ok(()) } +/// Check `pg_ident.conf` and update if needed to allow databricks config. +pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> { + info!("checking pg_ident.conf"); + let pghba_path = pgdata_path.join("pg_ident.conf"); + + // Update pg_ident to contains databricks specfic settings + if let Some(databricks_pg_ident) = databricks_pg_ident { + if config::line_in_file( + &pghba_path, + &format!("include_if_exists {}\n", *databricks_pg_ident), + )? { + info!("updated pg_ident.conf to include databricks_pg_ident.conf"); + } else { + info!("pg_ident.conf already included databricks_pg_ident.conf"); + } + } + + Ok(()) +} + +/// Copy tls key_file and cert_file from k8s secret mount directory +/// to pgdata and set private key file permissions as expected by Postgres. +/// See this doc for expected permission +/// K8s secrets mount on dblet does not honor permission and ownership +/// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions. +pub fn copy_tls_certificates( + key_file: &String, + cert_file: &String, + pgdata_path: &Path, +) -> Result<()> { + let files = [cert_file, key_file]; + for file in files.iter() { + let source = Path::new(file); + let dest = pgdata_path.join(source.file_name().unwrap()); + if !dest.exists() { + std::fs::copy(source, &dest)?; + info!( + "Copying tls file: {} to {}", + &source.display(), + &dest.display() + ); + } + if *file == key_file { + // Postgres requires private key to be readable only by the owner by having + // chmod 600 permissions. + let permissions = Permissions::from_mode(0o600); + fs::set_permissions(&dest, permissions)?; + info!("Setting permission on {}.", &dest.display()); + } + } + Ok(()) +} + /// Create a standby.signal file pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> { // XXX: consider making it a part of config.json @@ -170,7 +240,11 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> { } #[instrument(skip_all)] -pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> { +pub async fn handle_migrations( + params: ComputeNodeParams, + client: &mut Client, + lakebase_mode: bool, +) -> Result<()> { info!("handle migrations"); // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! @@ -234,7 +308,7 @@ pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) - ), ]; - MigrationRunner::new(client, &migrations) + MigrationRunner::new(client, &migrations, lakebase_mode) .run_migrations() .await?; diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 0eeab2bebc..061ac3e66d 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -416,6 +416,32 @@ pub struct GenericOption { pub vartype: String, } +/// Postgres compute TLS settings. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct PgComputeTlsSettings { + // Absolute path to the certificate file for server-side TLS. + pub cert_file: String, + // Absolute path to the private key file for server-side TLS. + pub key_file: String, + // Absolute path to the certificate authority file for verifying client certificates. + pub ca_file: String, +} + +/// Databricks specific options for compute instance. +/// This is used to store any other settings that needs to be propagate to Compute +/// but should not be persisted to ComputeSpec in the database. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct DatabricksSettings { + pub pg_compute_tls_settings: PgComputeTlsSettings, + // Absolute file path to databricks_pg_hba.conf file. + pub databricks_pg_hba: String, + // Absolute file path to databricks_pg_ident.conf file. + pub databricks_pg_ident: String, + // Hostname portion of the Databricks workspace URL of the endpoint, or empty string if not known. + // A valid hostname is required for the compute instance to support PAT logins. + pub databricks_workspace_host: String, +} + /// Optional collection of `GenericOption`'s. Type alias allows us to /// declare a `trait` on it. pub type GenericOptions = Option>; From 0fe07dec321b90f1e73617789fe5082859eec239 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 22 Jul 2025 18:43:35 +0200 Subject: [PATCH 2/4] test_runner: allow stuck reconciliation errors (#12682) This log message was added in #12589. During chaos tests, reconciles may not succeed for some time, triggering the log message. Resolves [LKB-2467](https://databricks.atlassian.net/browse/LKB-2467). --- test_runner/fixtures/pageserver/allowed_errors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 59249f31ad..007f80ee5e 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -152,6 +152,8 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [ ".*reconciler.*neon_local error.*", # Tenant rate limits may fire in tests that submit lots of API requests. ".*tenant \\S+ is rate limited.*", + # Reconciliations may get stuck/delayed e.g. in chaos tests. + ".*background_reconcile: Shard reconciliation is stuck.*", ] From 51ffeef93f36ffb8a51da5ce34de6bbd33474e7a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 22 Jul 2025 19:52:57 +0300 Subject: [PATCH 3/4] Fix postgres version compatibility macros (#12658) The argument to BufTagInit was called 'spcOid', and it was also setting a field called 'spcOid'. The field name would erroneously also be expanded with the macro arg. It happened to work so far, because all the users of the macro pass a variable called 'spcOid' for the 'spcOid' argument, but as soon as you try to pass anything else, it fails. And same story for 'dbOid' and 'relNumber'. Rename the arguments to avoid the name collision. Also while we're at it, add parens around the arguments in a few macros, to make them safer if you pass something non-trivial as the argument. --- pgxn/neon/neon_pgversioncompat.h | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/pgxn/neon/neon_pgversioncompat.h b/pgxn/neon/neon_pgversioncompat.h index c7574ef0f9..3ab8d3e5f5 100644 --- a/pgxn/neon/neon_pgversioncompat.h +++ b/pgxn/neon/neon_pgversioncompat.h @@ -72,22 +72,21 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode, (tag).rnode = (rinfo); \ } while (false) -#define BufTagGetNRelFileInfo(tag) tag.rnode +#define BufTagGetNRelFileInfo(tag) (tag).rnode #define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode) -#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \ +#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \ do { \ - RelFileNode rnode = { .spcNode = spcOid, .dbNode = dbOid, .relNode = relNumber}; \ - (tag).forkNum = forknum; \ - (tag).blockNum = blkno; \ - (tag).rnode = rnode; \ + RelFileNode rnode = { .spcNode = (spc_oid), .dbNode = (db_oid), .relNode = (rel_number)}; \ + (tag).forkNum = (fork_number); \ + (tag).blockNum = (block_number); \ + (tag).rnode = rnode; \ } while (false) #define InvalidRelFileNumber InvalidOid -#define SMgrRelGetRelInfo(reln) \ - (reln->smgr_rnode.node) +#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rnode.node) #define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers @@ -133,17 +132,16 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode, .relNumber = (tag).relNumber, \ }) -#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \ +#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \ do { \ - (tag).forkNum = forknum; \ - (tag).blockNum = blkno; \ - (tag).spcOid = spcOid; \ - (tag).dbOid = dbOid; \ - (tag).relNumber = relNumber; \ + (tag).forkNum = (fork_number); \ + (tag).blockNum = (block_number); \ + (tag).spcOid = (spc_oid); \ + (tag).dbOid = (db_oid); \ + (tag).relNumber = (rel_number); \ } while (false) -#define SMgrRelGetRelInfo(reln) \ - ((reln)->smgr_rlocator) +#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rlocator) #define DropRelationAllLocalBuffers DropRelationAllLocalBuffers #endif From 5b0972151c0079654b60cee795414618f67da705 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 22 Jul 2025 18:34:23 +0100 Subject: [PATCH 4/4] pageserver: silence shard resolution warning (#12685) ## Problem We drive the get page requests that have started processing to completion. So in the case when the compute received a reconfiguration request and the old connection has a request procesing on the pageserver, we are going to issue the warning. I spot checked a few instances of the warning and in all cases the compute was already connected to the correct pageserver. ## Summary of Changes Downgrade to INFO. It would be nice to somehow figure out if the connection has been terminated in the meantime, but the terminate libpq message is still in the pipe while we're doing the shard resolution. Closes LKB-2381 --- pageserver/src/tenant/timeline/handle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 0b118dd65d..3570cab301 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -362,7 +362,7 @@ impl Cache { tokio::time::sleep(RETRY_BACKOFF).await; continue; } else { - tracing::warn!( + tracing::info!( "Failed to resolve tenant shard after {} attempts: {:?}", GET_MAX_RETRIES, e