diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index f53adbb1df..8350a4c059 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -6,7 +6,8 @@ use compute_api::responses::{ LfcPrewarmState, PromoteState, TlsConfig, }; use compute_api::spec::{ - ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent, + ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, GenericOption, + PageserverProtocol, PgIdent, Role, }; use futures::StreamExt; use futures::future::join_all; @@ -413,6 +414,66 @@ struct StartVmMonitorResult { vm_monitor: Option>>, } +// BEGIN_HADRON +/// This function creates roles that are used by Databricks. +/// These roles are not needs to be botostrapped at PG Compute provisioning time. +/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository. +pub(crate) fn create_databricks_roles() -> Vec { + let roles = vec![ + // Role for prometheus_stats_exporter + Role { + name: "databricks_monitor".to_string(), + // This uses "local" connection and auth method for that is "trust", so no password is needed. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "IN ROLE pg_monitor".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore control plane + Role { + name: "databricks_control_plane".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "SUPERUSER".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore httpgateway. + Role { + name: "databricks_gateway".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: None, + }, + ]; + + roles + .into_iter() + .map(|role| { + let query = format!( + r#" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}') + THEN + CREATE ROLE {} {}; + END IF; + END + $$;"#, + role.name, + role.name.pg_quote(), + role.to_pg_options(), + ); + query + }) + .collect() +} + /// Databricks-specific environment variables to be passed to the `postgres` sub-process. pub struct DatabricksEnvVars { /// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 47bf61ae1b..2356078703 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -13,17 +13,19 @@ use tokio_postgres::Client; use tokio_postgres::error::SqlState; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; -use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState}; +use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState, create_databricks_roles}; +use crate::hadron_metrics::COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS; use crate::pg_helpers::{ DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async, get_existing_roles_async, }; use crate::spec_apply::ApplySpecPhase::{ - CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension, + AddDatabricksGrants, AlterDatabricksRoles, CreateAndAlterDatabases, CreateAndAlterRoles, + CreateAvailabilityCheck, CreateDatabricksMisc, CreateDatabricksRoles, CreatePgauditExtension, CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon, DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, - HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, - RunInEachDatabase, + HandleDatabricksAuthExtension, HandleNeonExtension, HandleOtherExtensions, + RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; use crate::spec_apply::PerDatabasePhase::{ ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, @@ -166,6 +168,7 @@ impl ComputeNode { concurrency_token.clone(), db, [DropLogicalSubscriptions].to_vec(), + self.params.lakebase_mode, ); Ok(tokio::spawn(fut)) @@ -186,15 +189,33 @@ impl ComputeNode { }; } - for phase in [ - CreatePrivilegedRole, + let phases = if self.params.lakebase_mode { + vec![ + CreatePrivilegedRole, + // BEGIN_HADRON + CreateDatabricksRoles, + AlterDatabricksRoles, + // END_HADRON DropInvalidDatabases, RenameRoles, CreateAndAlterRoles, RenameAndDeleteDatabases, CreateAndAlterDatabases, CreateSchemaNeon, - ] { + ] + } else { + vec![ + CreatePrivilegedRole, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + CreateSchemaNeon, + ] + }; + + for phase in phases { info!("Applying phase {:?}", &phase); apply_operations( params.clone(), @@ -203,6 +224,7 @@ impl ComputeNode { jwks_roles.clone(), phase, || async { Ok(&client) }, + self.params.lakebase_mode, ) .await?; } @@ -254,6 +276,7 @@ impl ComputeNode { concurrency_token.clone(), db, phases, + self.params.lakebase_mode, ); Ok(tokio::spawn(fut)) @@ -265,12 +288,28 @@ impl ComputeNode { handle.await??; } - let mut phases = vec![ + let mut phases = if self.params.lakebase_mode { + vec![ + HandleOtherExtensions, + HandleNeonExtension, // This step depends on CreateSchemaNeon + // BEGIN_HADRON + HandleDatabricksAuthExtension, + // END_HADRON + CreateAvailabilityCheck, + DropRoles, + // BEGIN_HADRON + AddDatabricksGrants, + CreateDatabricksMisc, + // END_HADRON + ] + } else { + vec![ HandleOtherExtensions, HandleNeonExtension, // This step depends on CreateSchemaNeon CreateAvailabilityCheck, DropRoles, - ]; + ] + }; // This step depends on CreateSchemaNeon if spec.drop_subscriptions_before_start && !drop_subscriptions_done { @@ -303,6 +342,7 @@ impl ComputeNode { jwks_roles.clone(), phase, || async { Ok(&client) }, + self.params.lakebase_mode, ) .await?; } @@ -328,6 +368,7 @@ impl ComputeNode { concurrency_token: Arc, db: DB, subphases: Vec, + lakebase_mode: bool, ) -> Result<()> { let _permit = concurrency_token.acquire().await?; @@ -355,6 +396,7 @@ impl ComputeNode { let client = client_conn.as_ref().unwrap(); Ok(client) }, + lakebase_mode, ) .await?; } @@ -477,6 +519,10 @@ pub enum PerDatabasePhase { #[derive(Clone, Debug)] pub enum ApplySpecPhase { CreatePrivilegedRole, + // BEGIN_HADRON + CreateDatabricksRoles, + AlterDatabricksRoles, + // END_HADRON DropInvalidDatabases, RenameRoles, CreateAndAlterRoles, @@ -489,7 +535,14 @@ pub enum ApplySpecPhase { DisablePostgresDBPgAudit, HandleOtherExtensions, HandleNeonExtension, + // BEGIN_HADRON + HandleDatabricksAuthExtension, + // END_HADRON CreateAvailabilityCheck, + // BEGIN_HADRON + AddDatabricksGrants, + CreateDatabricksMisc, + // END_HADRON DropRoles, FinalizeDropLogicalSubscriptions, } @@ -525,6 +578,7 @@ pub async fn apply_operations<'a, Fut, F>( jwks_roles: Arc>, apply_spec_phase: ApplySpecPhase, client: F, + lakebase_mode: bool, ) -> Result<()> where F: FnOnce() -> Fut, @@ -571,6 +625,23 @@ where }, query ); + if !lakebase_mode { + return res; + } + // BEGIN HADRON + if let Err(e) = res.as_ref() { + if let Some(sql_state) = e.code() { + if sql_state.code() == "57014" { + // SQL State 57014 (ERRCODE_QUERY_CANCELED) is used for statement timeouts. + // Increment the counter whenever a statement timeout occurs. Timeouts on + // this configuration path can only occur due to PS connectivity problems that + // Postgres failed to recover from. + COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.inc(); + } + } + } + // END HADRON + res } .instrument(inspan) @@ -612,6 +683,35 @@ async fn get_operations<'a>( ), comment: None, }))), + // BEGIN_HADRON + // New Hadron phase + ApplySpecPhase::CreateDatabricksRoles => { + let queries = create_databricks_roles(); + let operations = queries.into_iter().map(|query| Operation { + query, + comment: None, + }); + Ok(Box::new(operations)) + } + + // Backfill existing databricks_reader_* roles with statement timeout from GUC + ApplySpecPhase::AlterDatabricksRoles => { + let query = String::from(include_str!( + "sql/alter_databricks_reader_roles_timeout.sql" + )); + + let operations = once(Operation { + query, + comment: Some( + "Backfill existing databricks_reader_* roles with statement timeout" + .to_string(), + ), + }); + + Ok(Box::new(operations)) + } + // End of new Hadron Phase + // END_HADRON ApplySpecPhase::DropInvalidDatabases => { let mut ctx = ctx.write().await; let databases = &mut ctx.dbs; @@ -981,7 +1081,10 @@ async fn get_operations<'a>( // N.B. this has to be properly dollar-escaped with `pg_quote_dollar()` role_name = escaped_role, outer_tag = outer_tag, - ), + ) + // HADRON change: + .replace("neon_superuser", ¶ms.privileged_role_name), + // HADRON change end , comment: None, }, // This now will only drop privileges of the role @@ -1017,7 +1120,8 @@ async fn get_operations<'a>( comment: None, }, Operation { - query: String::from(include_str!("sql/default_grants.sql")), + query: String::from(include_str!("sql/default_grants.sql")) + .replace("neon_superuser", ¶ms.privileged_role_name), comment: None, }, ] @@ -1086,6 +1190,28 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + // BEGIN_HADRON + // Note: we may want to version the extension someday, but for now we just drop it and recreate it. + ApplySpecPhase::HandleDatabricksAuthExtension => { + let operations = vec![ + Operation { + query: String::from("DROP EXTENSION IF EXISTS databricks_auth"), + comment: Some(String::from("dropping existing databricks_auth extension")), + }, + Operation { + query: String::from("CREATE EXTENSION databricks_auth"), + comment: Some(String::from("creating databricks_auth extension")), + }, + Operation { + query: String::from("GRANT SELECT ON databricks_auth_metrics TO pg_monitor"), + comment: Some(String::from("grant select on databricks auth counters")), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + // END_HADRON ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation { query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")), comment: None, @@ -1103,6 +1229,63 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + + // BEGIN_HADRON + // New Hadron phases + // + // Grants permissions to roles that are used by Databricks. + ApplySpecPhase::AddDatabricksGrants => { + let operations = vec![ + Operation { + query: String::from("GRANT USAGE ON SCHEMA neon TO databricks_monitor"), + comment: Some(String::from( + "Permissions needed to execute neon.* functions (in the postgres database)", + )), + }, + Operation { + query: String::from( + "GRANT SELECT, INSERT, UPDATE ON health_check TO databricks_monitor", + ), + comment: Some(String::from("Permissions needed for read and write probes")), + }, + Operation { + query: String::from( + "GRANT EXECUTE ON FUNCTION pg_ls_dir(text) TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to monitor .snap file counts", + )), + }, + Operation { + query: String::from( + "GRANT SELECT ON neon.neon_perf_counters TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to access neon performance counters view", + )), + }, + Operation { + query: String::from( + "GRANT EXECUTE ON FUNCTION neon.get_perf_counters() TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to execute the underlying performance counters function", + )), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + // Creates minor objects that are used by Databricks. + ApplySpecPhase::CreateDatabricksMisc => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/create_databricks_misc.sql")), + comment: Some(String::from( + "The function databricks_monitor uses to convert exception to 0 or 1", + )), + }))), + // End of new Hadron phases + // END_HADRON ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation { query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")), comment: None, diff --git a/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql b/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql new file mode 100644 index 0000000000..db16df3817 --- /dev/null +++ b/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql @@ -0,0 +1,25 @@ +DO $$ +DECLARE + reader_role RECORD; + timeout_value TEXT; +BEGIN + -- Get the current GUC setting for reader statement timeout + SELECT current_setting('databricks.reader_statement_timeout', true) INTO timeout_value; + + -- Only proceed if timeout_value is not null/empty and not '0' (disabled) + IF timeout_value IS NOT NULL AND timeout_value != '' AND timeout_value != '0' THEN + -- Find all databricks_reader_* roles and update their statement_timeout + FOR reader_role IN + SELECT r.rolname + FROM pg_roles r + WHERE r.rolname ~ '^databricks_reader_\d+$' + LOOP + -- Apply the timeout setting to the role (will overwrite existing setting) + EXECUTE format('ALTER ROLE %I SET statement_timeout = %L', + reader_role.rolname, timeout_value); + + RAISE LOG 'Updated statement_timeout = % for role %', timeout_value, reader_role.rolname; + END LOOP; + END IF; +END +$$; diff --git a/compute_tools/src/sql/create_databricks_misc.sql b/compute_tools/src/sql/create_databricks_misc.sql new file mode 100644 index 0000000000..a6dc379078 --- /dev/null +++ b/compute_tools/src/sql/create_databricks_misc.sql @@ -0,0 +1,15 @@ +ALTER ROLE databricks_monitor SET statement_timeout = '60s'; + +CREATE OR REPLACE FUNCTION health_check_write_succeeds() +RETURNS INTEGER AS $$ +BEGIN +INSERT INTO health_check VALUES (1, now()) +ON CONFLICT (id) DO UPDATE + SET updated_at = now(); + +RETURN 1; +EXCEPTION WHEN OTHERS THEN +RAISE EXCEPTION '[DATABRICKS_SMGR] health_check failed: [%] %', SQLSTATE, SQLERRM; +RETURN 0; +END; +$$ LANGUAGE plpgsql;