diff --git a/compute/patches/pg_repack.patch b/compute/patches/pg_repack.patch index 10ed1054ff..b8a057e222 100644 --- a/compute/patches/pg_repack.patch +++ b/compute/patches/pg_repack.patch @@ -1,5 +1,11 @@ +commit 5eb393810cf7c7bafa4e394dad2e349e2a8cb2cb +Author: Alexey Masterov +Date: Mon Jul 28 18:11:02 2025 +0200 + + Patch for pg_repack + diff --git a/regress/Makefile b/regress/Makefile -index bf6edcb..89b4c7f 100644 +index bf6edcb..110e734 100644 --- a/regress/Makefile +++ b/regress/Makefile @@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\} @@ -7,18 +13,36 @@ index bf6edcb..89b4c7f 100644 # -REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger -+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger ++REGRESS := init-extension noautovacuum repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger autovacuum USE_PGXS = 1 # use pgxs if not in contrib directory PGXS := $(shell $(PG_CONFIG) --pgxs) -diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out -index 9f2e171..f6e4f8d 100644 ---- a/regress/expected/init-extension.out -+++ b/regress/expected/init-extension.out -@@ -1,3 +1,2 @@ - SET client_min_messages = warning; - CREATE EXTENSION pg_repack; --RESET client_min_messages; +diff --git a/regress/expected/autovacuum.out b/regress/expected/autovacuum.out +new file mode 100644 +index 0000000..e7f2363 +--- /dev/null ++++ b/regress/expected/autovacuum.out +@@ -0,0 +1,7 @@ ++ALTER SYSTEM SET autovacuum='on'; ++SELECT pg_reload_conf(); ++ pg_reload_conf ++---------------- ++ t ++(1 row) ++ +diff --git a/regress/expected/noautovacuum.out b/regress/expected/noautovacuum.out +new file mode 100644 +index 0000000..fc7978e +--- /dev/null ++++ b/regress/expected/noautovacuum.out +@@ -0,0 +1,7 @@ ++ALTER SYSTEM SET autovacuum='off'; ++SELECT pg_reload_conf(); ++ pg_reload_conf ++---------------- ++ t ++(1 row) ++ diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out index 8d0a94e..63b68bf 100644 --- a/regress/expected/nosuper.out @@ -50,14 +74,22 @@ index 8d0a94e..63b68bf 100644 INFO: repacking table "public.tbl_cluster" ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block DETAIL: query was: RESET lock_timeout -diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql -index 9f2e171..f6e4f8d 100644 ---- a/regress/sql/init-extension.sql -+++ b/regress/sql/init-extension.sql -@@ -1,3 +1,2 @@ - SET client_min_messages = warning; - CREATE EXTENSION pg_repack; --RESET client_min_messages; +diff --git a/regress/sql/autovacuum.sql b/regress/sql/autovacuum.sql +new file mode 100644 +index 0000000..a8eda63 +--- /dev/null ++++ b/regress/sql/autovacuum.sql +@@ -0,0 +1,2 @@ ++ALTER SYSTEM SET autovacuum='on'; ++SELECT pg_reload_conf(); +diff --git a/regress/sql/noautovacuum.sql b/regress/sql/noautovacuum.sql +new file mode 100644 +index 0000000..13d4836 +--- /dev/null ++++ b/regress/sql/noautovacuum.sql +@@ -0,0 +1,2 @@ ++ALTER SYSTEM SET autovacuum='off'; ++SELECT pg_reload_conf(); diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql index 072f0fa..dbe60f8 100644 --- a/regress/sql/nosuper.sql diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 9c86aba531..2b4802f309 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -82,6 +82,15 @@ struct Cli { #[arg(long, default_value_t = 3081)] pub internal_http_port: u16, + /// Backwards-compatible --http-port for Hadron deployments. Functionally the + /// same as --external-http-port. + #[arg( + long, + conflicts_with = "external_http_port", + conflicts_with = "internal_http_port" + )] + pub http_port: Option, + #[arg(short = 'D', long, value_name = "DATADIR")] pub pgdata: String, @@ -181,6 +190,26 @@ impl Cli { } } +// Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port` +// arg is used and acts the same as `--external-http-port`. The internal http port is defined +// to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so +// we need to be careful with the ports to choose. +fn get_external_http_port(cli: &Cli) -> u16 { + if cli.lakebase_mode { + return cli.http_port.unwrap_or(cli.external_http_port); + } + cli.external_http_port +} +fn get_internal_http_port(cli: &Cli) -> u16 { + if cli.lakebase_mode { + return cli + .http_port + .map(|p| p + 1) + .unwrap_or(cli.internal_http_port); + } + cli.internal_http_port +} + fn main() -> Result<()> { let cli = Cli::parse(); @@ -205,13 +234,18 @@ fn main() -> Result<()> { // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; - installed_extensions::initialize_metrics(); - hadron_metrics::initialize_metrics(); + if cli.lakebase_mode { + installed_extensions::initialize_metrics(); + hadron_metrics::initialize_metrics(); + } let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?; let config = get_config(&cli)?; + let external_http_port = get_external_http_port(&cli); + let internal_http_port = get_internal_http_port(&cli); + let compute_node = ComputeNode::new( ComputeNodeParams { compute_id: cli.compute_id, @@ -220,8 +254,8 @@ fn main() -> Result<()> { pgdata: cli.pgdata.clone(), pgbin: cli.pgbin.clone(), pgversion: get_pg_version_string(&cli.pgbin), - external_http_port: cli.external_http_port, - internal_http_port: cli.internal_http_port, + external_http_port, + internal_http_port, remote_ext_base_url: cli.remote_ext_base_url.clone(), resize_swap_on_bind: cli.resize_swap_on_bind, set_disk_quota_for_fs: cli.set_disk_quota_for_fs, diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ba8df1d9ab..1033670e2b 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -6,8 +6,8 @@ use compute_api::responses::{ LfcPrewarmState, PromoteState, TlsConfig, }; use compute_api::spec::{ - ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverConnectionInfo, - PageserverProtocol, PgIdent, + ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, GenericOption, + PageserverConnectionInfo, PageserverProtocol, PgIdent, Role, }; use futures::StreamExt; use futures::future::join_all; @@ -436,6 +436,66 @@ struct StartVmMonitorResult { vm_monitor: Option>>, } +// BEGIN_HADRON +/// This function creates roles that are used by Databricks. +/// These roles are not needs to be botostrapped at PG Compute provisioning time. +/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository. +pub(crate) fn create_databricks_roles() -> Vec { + let roles = vec![ + // Role for prometheus_stats_exporter + Role { + name: "databricks_monitor".to_string(), + // This uses "local" connection and auth method for that is "trust", so no password is needed. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "IN ROLE pg_monitor".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore control plane + Role { + name: "databricks_control_plane".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "SUPERUSER".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore httpgateway. + Role { + name: "databricks_gateway".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: None, + }, + ]; + + roles + .into_iter() + .map(|role| { + let query = format!( + r#" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}') + THEN + CREATE ROLE {} {}; + END IF; + END + $$;"#, + role.name, + role.name.pg_quote(), + role.to_pg_options(), + ); + query + }) + .collect() +} + /// Databricks-specific environment variables to be passed to the `postgres` sub-process. pub struct DatabricksEnvVars { /// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check @@ -444,14 +504,27 @@ pub struct DatabricksEnvVars { /// Hostname of the Databricks workspace URL this compute instance belongs to. /// Used by postgres to verify Databricks PAT tokens. pub workspace_host: String, + + pub lakebase_mode: bool, } impl DatabricksEnvVars { - pub fn new(compute_spec: &ComputeSpec, compute_id: Option<&String>) -> Self { - // compute_id is a string format of "{endpoint_id}/{compute_idx}" - // endpoint_id is a uuid. We only need to pass down endpoint_id to postgres. - // Panics if compute_id is not set or not in the expected format. - let endpoint_id = compute_id.unwrap().split('/').next().unwrap().to_string(); + pub fn new( + compute_spec: &ComputeSpec, + compute_id: Option<&String>, + instance_id: Option, + lakebase_mode: bool, + ) -> Self { + let endpoint_id = if let Some(instance_id) = instance_id { + // Use instance_id as endpoint_id if it is set. This code path is for PuPr model. + instance_id + } else { + // Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model. + // compute_id is a string format of "{endpoint_id}/{compute_idx}" + // endpoint_id is a uuid. We only need to pass down endpoint_id to postgres. + // Panics if compute_id is not set or not in the expected format. + compute_id.unwrap().split('/').next().unwrap().to_string() + }; let workspace_host = compute_spec .databricks_settings .as_ref() @@ -460,6 +533,7 @@ impl DatabricksEnvVars { Self { endpoint_id, workspace_host, + lakebase_mode, } } @@ -469,6 +543,10 @@ impl DatabricksEnvVars { /// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`. pub fn to_env_var_list(self) -> Vec<(String, String)> { + if !self.lakebase_mode { + // In neon env, we don't need to pass down the env vars to postgres. + return vec![]; + } vec![ ( Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(), @@ -518,7 +596,11 @@ impl ComputeNode { let mut new_state = ComputeState::new(); if let Some(spec) = config.spec { let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - new_state.pspec = Some(pspec); + if params.lakebase_mode { + ComputeNode::set_spec(¶ms, &mut new_state, pspec); + } else { + new_state.pspec = Some(pspec); + } } Ok(ComputeNode { @@ -1116,7 +1198,14 @@ impl ComputeNode { // If it is something different then create_dir() will error out anyway. let pgdata = &self.params.pgdata; let _ok = fs::remove_dir_all(pgdata); - fs::create_dir(pgdata)?; + if self.params.lakebase_mode { + // Ignore creation errors if the directory already exists (e.g. mounting it ahead of time). + // If it is something different then PG startup will error out anyway. + let _ok = fs::create_dir(pgdata); + } else { + fs::create_dir(pgdata)?; + } + fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?; Ok(()) @@ -1581,7 +1670,7 @@ impl ComputeNode { // symlink doesn't affect anything. // // See https://github.com/neondatabase/autoscaling/issues/800 - std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?; + std::fs::remove_dir_all(pgdata_path.join("pg_dynshmem"))?; symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?; match spec.mode { @@ -1596,6 +1685,12 @@ impl ComputeNode { /// Start and stop a postgres process to warm up the VM for startup. pub fn prewarm_postgres_vm_memory(&self) -> Result<()> { + if self.params.lakebase_mode { + // We are running in Hadron mode. Disabling this prewarming step for now as it could run + // into dblet port conflicts and also doesn't add much value with our current infra. + info!("Skipping postgres prewarming in Hadron mode"); + return Ok(()); + } info!("prewarming VM memory"); // Create pgdata @@ -1657,7 +1752,12 @@ impl ComputeNode { let databricks_env_vars = { let state = self.state.lock().unwrap(); let spec = &state.pspec.as_ref().unwrap().spec; - DatabricksEnvVars::new(spec, Some(&self.params.compute_id)) + DatabricksEnvVars::new( + spec, + Some(&self.params.compute_id), + self.params.instance_id.clone(), + self.params.lakebase_mode, + ) }; info!( @@ -1829,7 +1929,15 @@ impl ComputeNode { /// Do initial configuration of the already started Postgres. #[instrument(skip_all)] pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + + if self.params.lakebase_mode { + // Set a 2-minute statement_timeout for the session applying config. The individual SQL statements + // used in apply_spec_sql() should not take long (they are just creating users and installing + // extensions). If any of them are stuck for an extended period of time it usually indicates a + // pageserver connectivity problem and we should bail out. + conf.options("-c statement_timeout=2min"); + } let conf = Arc::new(conf); let spec = Arc::new( @@ -2147,7 +2255,17 @@ impl ComputeNode { pub fn check_for_core_dumps(&self) -> Result<()> { let core_dump_dir = match std::env::consts::OS { "macos" => Path::new("/cores/"), - _ => Path::new(&self.params.pgdata), + // BEGIN HADRON + // NB: Read core dump files from a fixed location outside of + // the data directory since `compute_ctl` wipes the data directory + // across container restarts. + _ => { + if self.params.lakebase_mode { + Path::new("/databricks/logs/brickstore") + } else { + Path::new(&self.params.pgdata) + } + } // END HADRON }; // Collect core dump paths if any @@ -2460,7 +2578,7 @@ LIMIT 100", if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } @@ -2479,7 +2597,7 @@ LIMIT 100", if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) { preload_libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 47bf61ae1b..2356078703 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -13,17 +13,19 @@ use tokio_postgres::Client; use tokio_postgres::error::SqlState; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; -use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState}; +use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState, create_databricks_roles}; +use crate::hadron_metrics::COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS; use crate::pg_helpers::{ DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async, get_existing_roles_async, }; use crate::spec_apply::ApplySpecPhase::{ - CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension, + AddDatabricksGrants, AlterDatabricksRoles, CreateAndAlterDatabases, CreateAndAlterRoles, + CreateAvailabilityCheck, CreateDatabricksMisc, CreateDatabricksRoles, CreatePgauditExtension, CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon, DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, - HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, - RunInEachDatabase, + HandleDatabricksAuthExtension, HandleNeonExtension, HandleOtherExtensions, + RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; use crate::spec_apply::PerDatabasePhase::{ ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, @@ -166,6 +168,7 @@ impl ComputeNode { concurrency_token.clone(), db, [DropLogicalSubscriptions].to_vec(), + self.params.lakebase_mode, ); Ok(tokio::spawn(fut)) @@ -186,15 +189,33 @@ impl ComputeNode { }; } - for phase in [ - CreatePrivilegedRole, + let phases = if self.params.lakebase_mode { + vec![ + CreatePrivilegedRole, + // BEGIN_HADRON + CreateDatabricksRoles, + AlterDatabricksRoles, + // END_HADRON DropInvalidDatabases, RenameRoles, CreateAndAlterRoles, RenameAndDeleteDatabases, CreateAndAlterDatabases, CreateSchemaNeon, - ] { + ] + } else { + vec![ + CreatePrivilegedRole, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + CreateSchemaNeon, + ] + }; + + for phase in phases { info!("Applying phase {:?}", &phase); apply_operations( params.clone(), @@ -203,6 +224,7 @@ impl ComputeNode { jwks_roles.clone(), phase, || async { Ok(&client) }, + self.params.lakebase_mode, ) .await?; } @@ -254,6 +276,7 @@ impl ComputeNode { concurrency_token.clone(), db, phases, + self.params.lakebase_mode, ); Ok(tokio::spawn(fut)) @@ -265,12 +288,28 @@ impl ComputeNode { handle.await??; } - let mut phases = vec![ + let mut phases = if self.params.lakebase_mode { + vec![ + HandleOtherExtensions, + HandleNeonExtension, // This step depends on CreateSchemaNeon + // BEGIN_HADRON + HandleDatabricksAuthExtension, + // END_HADRON + CreateAvailabilityCheck, + DropRoles, + // BEGIN_HADRON + AddDatabricksGrants, + CreateDatabricksMisc, + // END_HADRON + ] + } else { + vec![ HandleOtherExtensions, HandleNeonExtension, // This step depends on CreateSchemaNeon CreateAvailabilityCheck, DropRoles, - ]; + ] + }; // This step depends on CreateSchemaNeon if spec.drop_subscriptions_before_start && !drop_subscriptions_done { @@ -303,6 +342,7 @@ impl ComputeNode { jwks_roles.clone(), phase, || async { Ok(&client) }, + self.params.lakebase_mode, ) .await?; } @@ -328,6 +368,7 @@ impl ComputeNode { concurrency_token: Arc, db: DB, subphases: Vec, + lakebase_mode: bool, ) -> Result<()> { let _permit = concurrency_token.acquire().await?; @@ -355,6 +396,7 @@ impl ComputeNode { let client = client_conn.as_ref().unwrap(); Ok(client) }, + lakebase_mode, ) .await?; } @@ -477,6 +519,10 @@ pub enum PerDatabasePhase { #[derive(Clone, Debug)] pub enum ApplySpecPhase { CreatePrivilegedRole, + // BEGIN_HADRON + CreateDatabricksRoles, + AlterDatabricksRoles, + // END_HADRON DropInvalidDatabases, RenameRoles, CreateAndAlterRoles, @@ -489,7 +535,14 @@ pub enum ApplySpecPhase { DisablePostgresDBPgAudit, HandleOtherExtensions, HandleNeonExtension, + // BEGIN_HADRON + HandleDatabricksAuthExtension, + // END_HADRON CreateAvailabilityCheck, + // BEGIN_HADRON + AddDatabricksGrants, + CreateDatabricksMisc, + // END_HADRON DropRoles, FinalizeDropLogicalSubscriptions, } @@ -525,6 +578,7 @@ pub async fn apply_operations<'a, Fut, F>( jwks_roles: Arc>, apply_spec_phase: ApplySpecPhase, client: F, + lakebase_mode: bool, ) -> Result<()> where F: FnOnce() -> Fut, @@ -571,6 +625,23 @@ where }, query ); + if !lakebase_mode { + return res; + } + // BEGIN HADRON + if let Err(e) = res.as_ref() { + if let Some(sql_state) = e.code() { + if sql_state.code() == "57014" { + // SQL State 57014 (ERRCODE_QUERY_CANCELED) is used for statement timeouts. + // Increment the counter whenever a statement timeout occurs. Timeouts on + // this configuration path can only occur due to PS connectivity problems that + // Postgres failed to recover from. + COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.inc(); + } + } + } + // END HADRON + res } .instrument(inspan) @@ -612,6 +683,35 @@ async fn get_operations<'a>( ), comment: None, }))), + // BEGIN_HADRON + // New Hadron phase + ApplySpecPhase::CreateDatabricksRoles => { + let queries = create_databricks_roles(); + let operations = queries.into_iter().map(|query| Operation { + query, + comment: None, + }); + Ok(Box::new(operations)) + } + + // Backfill existing databricks_reader_* roles with statement timeout from GUC + ApplySpecPhase::AlterDatabricksRoles => { + let query = String::from(include_str!( + "sql/alter_databricks_reader_roles_timeout.sql" + )); + + let operations = once(Operation { + query, + comment: Some( + "Backfill existing databricks_reader_* roles with statement timeout" + .to_string(), + ), + }); + + Ok(Box::new(operations)) + } + // End of new Hadron Phase + // END_HADRON ApplySpecPhase::DropInvalidDatabases => { let mut ctx = ctx.write().await; let databases = &mut ctx.dbs; @@ -981,7 +1081,10 @@ async fn get_operations<'a>( // N.B. this has to be properly dollar-escaped with `pg_quote_dollar()` role_name = escaped_role, outer_tag = outer_tag, - ), + ) + // HADRON change: + .replace("neon_superuser", ¶ms.privileged_role_name), + // HADRON change end , comment: None, }, // This now will only drop privileges of the role @@ -1017,7 +1120,8 @@ async fn get_operations<'a>( comment: None, }, Operation { - query: String::from(include_str!("sql/default_grants.sql")), + query: String::from(include_str!("sql/default_grants.sql")) + .replace("neon_superuser", ¶ms.privileged_role_name), comment: None, }, ] @@ -1086,6 +1190,28 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + // BEGIN_HADRON + // Note: we may want to version the extension someday, but for now we just drop it and recreate it. + ApplySpecPhase::HandleDatabricksAuthExtension => { + let operations = vec![ + Operation { + query: String::from("DROP EXTENSION IF EXISTS databricks_auth"), + comment: Some(String::from("dropping existing databricks_auth extension")), + }, + Operation { + query: String::from("CREATE EXTENSION databricks_auth"), + comment: Some(String::from("creating databricks_auth extension")), + }, + Operation { + query: String::from("GRANT SELECT ON databricks_auth_metrics TO pg_monitor"), + comment: Some(String::from("grant select on databricks auth counters")), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + // END_HADRON ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation { query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")), comment: None, @@ -1103,6 +1229,63 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + + // BEGIN_HADRON + // New Hadron phases + // + // Grants permissions to roles that are used by Databricks. + ApplySpecPhase::AddDatabricksGrants => { + let operations = vec![ + Operation { + query: String::from("GRANT USAGE ON SCHEMA neon TO databricks_monitor"), + comment: Some(String::from( + "Permissions needed to execute neon.* functions (in the postgres database)", + )), + }, + Operation { + query: String::from( + "GRANT SELECT, INSERT, UPDATE ON health_check TO databricks_monitor", + ), + comment: Some(String::from("Permissions needed for read and write probes")), + }, + Operation { + query: String::from( + "GRANT EXECUTE ON FUNCTION pg_ls_dir(text) TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to monitor .snap file counts", + )), + }, + Operation { + query: String::from( + "GRANT SELECT ON neon.neon_perf_counters TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to access neon performance counters view", + )), + }, + Operation { + query: String::from( + "GRANT EXECUTE ON FUNCTION neon.get_perf_counters() TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to execute the underlying performance counters function", + )), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + // Creates minor objects that are used by Databricks. + ApplySpecPhase::CreateDatabricksMisc => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/create_databricks_misc.sql")), + comment: Some(String::from( + "The function databricks_monitor uses to convert exception to 0 or 1", + )), + }))), + // End of new Hadron phases + // END_HADRON ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation { query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")), comment: None, diff --git a/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql b/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql new file mode 100644 index 0000000000..db16df3817 --- /dev/null +++ b/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql @@ -0,0 +1,25 @@ +DO $$ +DECLARE + reader_role RECORD; + timeout_value TEXT; +BEGIN + -- Get the current GUC setting for reader statement timeout + SELECT current_setting('databricks.reader_statement_timeout', true) INTO timeout_value; + + -- Only proceed if timeout_value is not null/empty and not '0' (disabled) + IF timeout_value IS NOT NULL AND timeout_value != '' AND timeout_value != '0' THEN + -- Find all databricks_reader_* roles and update their statement_timeout + FOR reader_role IN + SELECT r.rolname + FROM pg_roles r + WHERE r.rolname ~ '^databricks_reader_\d+$' + LOOP + -- Apply the timeout setting to the role (will overwrite existing setting) + EXECUTE format('ALTER ROLE %I SET statement_timeout = %L', + reader_role.rolname, timeout_value); + + RAISE LOG 'Updated statement_timeout = % for role %', timeout_value, reader_role.rolname; + END LOOP; + END IF; +END +$$; diff --git a/compute_tools/src/sql/create_databricks_misc.sql b/compute_tools/src/sql/create_databricks_misc.sql new file mode 100644 index 0000000000..a6dc379078 --- /dev/null +++ b/compute_tools/src/sql/create_databricks_misc.sql @@ -0,0 +1,15 @@ +ALTER ROLE databricks_monitor SET statement_timeout = '60s'; + +CREATE OR REPLACE FUNCTION health_check_write_succeeds() +RETURNS INTEGER AS $$ +BEGIN +INSERT INTO health_check VALUES (1, now()) +ON CONFLICT (id) DO UPDATE + SET updated_at = now(); + +RETURN 1; +EXCEPTION WHEN OTHERS THEN +RAISE EXCEPTION '[DATABRICKS_SMGR] health_check failed: [%] %', SQLSTATE, SQLERRM; +RETURN 0; +END; +$$ LANGUAGE plpgsql; diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index 5adf38f457..23fabeccd2 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -9,7 +9,6 @@ regex.workspace = true bytes.workspace = true anyhow.workspace = true crc32c.workspace = true -criterion.workspace = true once_cell.workspace = true pprof.workspace = true thiserror.workspace = true @@ -20,6 +19,7 @@ tracing.workspace = true postgres_versioninfo.workspace = true [dev-dependencies] +criterion.workspace = true env_logger.workspace = true postgres.workspace = true diff --git a/libs/proxy/tokio-postgres2/src/error/mod.rs b/libs/proxy/tokio-postgres2/src/error/mod.rs index 6e68b1e595..3fbb97f9bb 100644 --- a/libs/proxy/tokio-postgres2/src/error/mod.rs +++ b/libs/proxy/tokio-postgres2/src/error/mod.rs @@ -9,7 +9,7 @@ use postgres_protocol2::message::backend::{ErrorFields, ErrorResponseBody}; pub use self::sqlstate::*; #[allow(clippy::unreadable_literal)] -mod sqlstate; +pub mod sqlstate; /// The severity of a Postgres error or notice. #[derive(Debug, Copy, Clone, PartialEq, Eq)] diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index b8ee57bf9f..dad37ebe74 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -230,16 +230,14 @@ impl PageserverClient { ) -> tonic::Result { // Fast path: request is for a single shard. if let Some(shard_id) = - GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size) - .map_err(|err| tonic::Status::internal(err.to_string()))? + GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)? { return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // reassemble the responses. - let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)?; let mut shard_requests = FuturesUnordered::new(); for (shard_id, shard_req) in splitter.drain_requests() { @@ -249,14 +247,10 @@ impl PageserverClient { } while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { - splitter - .add_response(shard_id, shard_response) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + splitter.add_response(shard_id, shard_response)?; } - splitter - .get_response() - .map_err(|err| tonic::Status::internal(err.to_string())) + Ok(splitter.collect_response()?) } /// Fetches pages on the given shard. Does not retry internally. diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index b44df6337f..b9be6b8b91 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -24,4 +24,4 @@ mod split; pub use client::Client; pub use model::*; -pub use split::GetPageSplitter; +pub use split::{GetPageSplitter, SplitError}; diff --git a/pageserver/page_api/src/split.rs b/pageserver/page_api/src/split.rs index 5ecc90a166..27c1c995e0 100644 --- a/pageserver/page_api/src/split.rs +++ b/pageserver/page_api/src/split.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use anyhow::anyhow; use bytes::Bytes; use crate::model::*; @@ -27,19 +26,19 @@ impl GetPageSplitter { req: &GetPageRequest, count: ShardCount, stripe_size: Option, - ) -> anyhow::Result> { + ) -> Result, SplitError> { // Fast path: unsharded tenant. if count.is_unsharded() { return Ok(Some(ShardIndex::unsharded())); } let Some(stripe_size) = stripe_size else { - return Err(anyhow!("stripe size must be given for sharded tenants")); + return Err("stripe size must be given for sharded tenants".into()); }; // Find the first page's shard, for comparison. let Some(&first_page) = req.block_numbers.first() else { - return Err(anyhow!("no block numbers in request")); + return Err("no block numbers in request".into()); }; let key = rel_block_to_key(req.rel, first_page); let shard_number = key_to_shard_number(count, stripe_size, &key); @@ -60,7 +59,7 @@ impl GetPageSplitter { req: GetPageRequest, count: ShardCount, stripe_size: Option, - ) -> anyhow::Result { + ) -> Result { // The caller should make sure we don't split requests unnecessarily. debug_assert!( Self::for_single_shard(&req, count, stripe_size)?.is_none(), @@ -68,10 +67,10 @@ impl GetPageSplitter { ); if count.is_unsharded() { - return Err(anyhow!("unsharded tenant, no point in splitting request")); + return Err("unsharded tenant, no point in splitting request".into()); } let Some(stripe_size) = stripe_size else { - return Err(anyhow!("stripe size must be given for sharded tenants")); + return Err("stripe size must be given for sharded tenants".into()); }; // Split the requests by shard index. @@ -129,35 +128,32 @@ impl GetPageSplitter { /// Adds a response from the given shard. The response must match the request ID and have an OK /// status code. A response must not already exist for the given shard ID. - #[allow(clippy::result_large_err)] pub fn add_response( &mut self, shard_id: ShardIndex, response: GetPageResponse, - ) -> anyhow::Result<()> { + ) -> Result<(), SplitError> { // The caller should already have converted status codes into tonic::Status. if response.status_code != GetPageStatusCode::Ok { - return Err(anyhow!( + return Err(SplitError(format!( "unexpected non-OK response for shard {shard_id}: {} {}", response.status_code, response.reason.unwrap_or_default() - )); + ))); } if response.request_id != self.response.request_id { - return Err(anyhow!( + return Err(SplitError(format!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.response.request_id, - response.request_id - )); + self.response.request_id, response.request_id + ))); } if response.request_id != self.response.request_id { - return Err(anyhow!( + return Err(SplitError(format!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.response.request_id, - response.request_id - )); + self.response.request_id, response.request_id + ))); } // Place the shard response pages into the assembled response, in request order. @@ -169,26 +165,27 @@ impl GetPageSplitter { } let Some(slot) = self.response.pages.get_mut(i) else { - return Err(anyhow!("no block_shards slot {i} for shard {shard_id}")); + return Err(SplitError(format!( + "no block_shards slot {i} for shard {shard_id}" + ))); }; let Some(page) = pages.next() else { - return Err(anyhow!( + return Err(SplitError(format!( "missing page {} in shard {shard_id} response", slot.block_number - )); + ))); }; if page.block_number != slot.block_number { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned wrong page at index {i}, expected {} got {}", - slot.block_number, - page.block_number - )); + slot.block_number, page.block_number + ))); } if !slot.image.is_empty() { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned duplicate page {} at index {i}", slot.block_number - )); + ))); } *slot = page; @@ -196,32 +193,54 @@ impl GetPageSplitter { // Make sure we've consumed all pages from the shard response. if let Some(extra_page) = pages.next() { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned extra page: {}", extra_page.block_number - )); + ))); } Ok(()) } - /// Fetches the final, assembled response. - #[allow(clippy::result_large_err)] - pub fn get_response(self) -> anyhow::Result { + /// Collects the final, assembled response. + pub fn collect_response(self) -> Result { // Check that the response is complete. for (i, page) in self.response.pages.iter().enumerate() { if page.image.is_empty() { - return Err(anyhow!( + return Err(SplitError(format!( "missing page {} for shard {}", page.block_number, self.block_shards .get(i) .map(|s| s.to_string()) .unwrap_or_else(|| "?".to_string()) - )); + ))); } } Ok(self.response) } } + +/// A GetPageSplitter error. +#[derive(Debug, thiserror::Error)] +#[error("{0}")] +pub struct SplitError(String); + +impl From<&str> for SplitError { + fn from(err: &str) -> Self { + SplitError(err.to_string()) + } +} + +impl From for SplitError { + fn from(err: String) -> Self { + SplitError(err) + } +} + +impl From for tonic::Status { + fn from(err: SplitError) -> Self { + tonic::Status::internal(err.0) + } +} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d4b8e3bcaa..61cf2954c1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -473,13 +473,6 @@ impl TimelineHandles { fn tenant_id(&self) -> Option { self.wrapper.tenant_id.get().copied() } - - /// Returns whether a child shard exists locally for the given shard. - fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool { - self.wrapper - .tenant_manager - .has_child_shard(tenant_id, shard_index) - } } pub(crate) struct TenantManagerWrapper { @@ -3432,18 +3425,6 @@ impl GrpcPageServiceHandler { Ok(CancellableTask { task, cancel }) } - /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of - /// relations and their sizes, as well as SLRU segments and similar data. - #[allow(clippy::result_large_err)] - fn ensure_shard_zero(timeline: &Handle) -> Result<(), tonic::Status> { - match timeline.get_shard_index().shard_number.0 { - 0 => Ok(()), - shard => Err(tonic::Status::invalid_argument(format!( - "request must execute on shard zero (is shard {shard})", - ))), - } - } - /// Generates a PagestreamRequest header from a ReadLsn and request ID. fn make_hdr( read_lsn: page_api::ReadLsn, @@ -3465,56 +3446,55 @@ impl GrpcPageServiceHandler { &self, req: &tonic::Request, ) -> Result, GetActiveTimelineError> { - let ttid = *extract::(req); + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); let shard_index = *extract::(req); - let shard_selector = ShardSelector::Known(shard_index); // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to // avoid the unnecessary overhead. TimelineHandles::new(self.tenant_manager.clone()) - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) .await } - /// Acquires a timeline handle for the given request, which must be for shard zero. + /// Acquires a timeline handle for the given request, which must be for shard zero. Most + /// metadata requests are only valid on shard zero. /// /// NB: during an ongoing shard split, the compute will keep talking to the parent shard until /// the split is committed, but the parent shard may have been removed in the meanwhile. In that /// case, we reroute the request to the new child shard. See [`Self::maybe_split_get_page`]. /// /// TODO: revamp the split protocol to avoid this child routing. - async fn get_shard_zero_request_timeline( + async fn get_request_timeline_shard_zero( &self, req: &tonic::Request, ) -> Result, tonic::Status> { - let ttid = *extract::(req); + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); let shard_index = *extract::(req); if shard_index.shard_number.0 != 0 { return Err(tonic::Status::invalid_argument(format!( - "request must use shard zero (requested shard {shard_index})", + "request only valid on shard zero (requested shard {shard_index})", ))); } // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to // avoid the unnecessary overhead. - // - // TODO: this does internal retries, which will delay requests during shard splits (we won't - // look for the child until the parent's retries are exhausted). Don't do that. let mut handles = TimelineHandles::new(self.tenant_manager.clone()); match handles - .get( - ttid.tenant_id, - ttid.timeline_id, - ShardSelector::Known(shard_index), - ) + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) .await { Ok(timeline) => Ok(timeline), Err(err) => { // We may be in the middle of a shard split. Try to find a child shard 0. if let Ok(timeline) = handles - .get(ttid.tenant_id, ttid.timeline_id, ShardSelector::Zero) + .get(tenant_id, timeline_id, ShardSelector::Zero) .await && timeline.get_shard_index().shard_count > shard_index.shard_count { @@ -3661,12 +3641,13 @@ impl GrpcPageServiceHandler { } /// Processes a GetPage request when there is a potential shard split in progress. We have to - /// reroute the request any local child shards, and split batch requests that straddle multiple - /// child shards. + /// reroute the request to any local child shards, and split batch requests that straddle + /// multiple child shards. /// - /// Parent shards are split and removed incrementally, but the compute is only notified once the - /// entire split commits, which can take several minutes. In the meanwhile, the compute will be - /// sending requests to the parent shard. + /// Parent shards are split and removed incrementally (there may be many parent shards when + /// splitting an already-sharded tenant), but the compute is only notified once the overall + /// split commits, which can take several minutes. In the meanwhile, the compute will be sending + /// requests to the parent shards. /// /// TODO: add test infrastructure to provoke this situation frequently and for long periods of /// time, to properly exercise it. @@ -3676,10 +3657,12 @@ impl GrpcPageServiceHandler { /// * Notify the compute about each subsplit. /// * Return an error that updates the compute's shard map. #[instrument(skip_all)] + #[allow(clippy::too_many_arguments)] async fn maybe_split_get_page( ctx: &RequestContext, handles: &mut TimelineHandles, - ttid: TenantTimelineId, + tenant_id: TenantId, + timeline_id: TimelineId, parent: ShardIndex, req: page_api::GetPageRequest, io_concurrency: IoConcurrency, @@ -3690,8 +3673,8 @@ impl GrpcPageServiceHandler { // the page must have a higher shard count. let timeline = handles .get( - ttid.tenant_id, - ttid.timeline_id, + tenant_id, + timeline_id, ShardSelector::Page(rel_block_to_key(req.rel, req.block_numbers[0])), ) .await?; @@ -3703,8 +3686,7 @@ impl GrpcPageServiceHandler { // Fast path: the request fits in a single shard. if let Some(shard_index) = - GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size)) - .map_err(|err| tonic::Status::internal(err.to_string()))? + GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))? { // We got the shard ID from the first page, so these must be equal. assert_eq!(shard_index.shard_number, shard_id.number); @@ -3715,17 +3697,12 @@ impl GrpcPageServiceHandler { // The request spans multiple shards; split it and dispatch parallel requests. All pages // were originally in the parent shard, and during a split all children are local, so we // expect to find local shards for all pages. - let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size)) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))?; let mut shard_requests = FuturesUnordered::new(); for (shard_index, shard_req) in splitter.drain_requests() { let timeline = handles - .get( - ttid.tenant_id, - ttid.timeline_id, - ShardSelector::Known(shard_index), - ) + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) .await?; let future = Self::get_page( ctx, @@ -3739,14 +3716,10 @@ impl GrpcPageServiceHandler { } while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? { - splitter - .add_response(shard_index, shard_response) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + splitter.add_response(shard_index, shard_response)?; } - splitter - .get_response() - .map_err(|err| tonic::Status::internal(err.to_string())) + Ok(splitter.collect_response()?) } } @@ -3775,7 +3748,7 @@ impl proto::PageService for GrpcPageServiceHandler { // to be the sweet spot where throughput is saturated. const CHUNK_SIZE: usize = 256 * 1024; - let timeline = self.get_shard_zero_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); // Validate the request and decorate the span. @@ -3894,7 +3867,7 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_shard_zero_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. @@ -3931,25 +3904,21 @@ impl proto::PageService for GrpcPageServiceHandler { // reroute requests to the child shards below, but we also detect the common cases here // where either the shard exists or no shards exist at all. If we have a child shard, we // can't acquire a weak handle because we don't know which child shard to use yet. - // - // TODO: TimelineHandles.get() does internal retries, which will delay requests during shard - // splits. It shouldn't. - let ttid = *extract::(&req); + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(&req); let shard_index = *extract::(&req); let mut handles = TimelineHandles::new(self.tenant_manager.clone()); let timeline = match handles - .get( - ttid.tenant_id, - ttid.timeline_id, - ShardSelector::Known(shard_index), - ) + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) .await { // The timeline shard exists. Keep a weak handle to reuse for each request. Ok(timeline) => Some(timeline.downgrade()), // The shard doesn't exist, but a child shard does. We'll reroute requests later. - Err(_) if handles.has_child_shard(ttid.tenant_id, shard_index) => None, + Err(_) if self.tenant_manager.has_child_shard(tenant_id, shard_index) => None, // Failed to fetch the timeline, and no child shard exists. Error out. Err(err) => return Err(err.into()), }; @@ -4005,7 +3974,8 @@ impl proto::PageService for GrpcPageServiceHandler { Self::maybe_split_get_page( &ctx, &mut handles, - ttid, + tenant_id, + timeline_id, shard_index, req, io_concurrency.clone(), @@ -4040,7 +4010,7 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_shard_zero_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. @@ -4076,11 +4046,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_shard_zero_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?; span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d03e6a1e05..0feba5e9c8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -826,9 +826,9 @@ impl TenantManager { peek_slot.is_some() } - /// Returns whether a local slot exists for a child shard of the given tenant and shard count. - /// Note that this just checks for a shard with a larger shard count, and it may not be a - /// direct child of the given shard. + /// Returns whether a local shard exists that's a child of the given tenant shard. Note that + /// this just checks for any shard with a larger shard count, and it may not be a direct child + /// of the given shard (their keyspace may not overlap). pub(crate) fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool { match &*self.tenants.read().unwrap() { TenantsMap::Initializing => false, @@ -1536,9 +1536,10 @@ impl TenantManager { // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant // // TODO: keeping the parent as InProgress while spawning the children causes read - // unavailability, as we can't acquire a timeline handle for it. The parent should be - // available for reads until the children are ready -- potentially until *all* subsplits - // across all parent shards are complete and the compute has been notified. See: + // unavailability, as we can't acquire a new timeline handle for it (existing handles appear + // to still work though, even downgraded ones). The parent should be available for reads + // until the children are ready -- potentially until *all* subsplits across all parent + // shards are complete and the compute has been notified. See: // . drop(tenant); let mut parent_slot_guard = diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index d5cc468253..8528243037 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1461,7 +1461,6 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } - void PagestoreShmemInit(void) { diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 7798e112ef..cacccf9123 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -53,6 +53,7 @@ void _PG_init(void); bool lakebase_mode = false; static int running_xacts_overflow_policy; +static emit_log_hook_type prev_emit_log_hook; static bool monitor_query_exec_time = false; static ExecutorStart_hook_type prev_ExecutorStart = NULL; @@ -83,6 +84,8 @@ uint32 WAIT_EVENT_NEON_PS_READ; uint32 WAIT_EVENT_NEON_WAL_DL; #endif +int databricks_test_hook = 0; + enum RunningXactsOverflowPolicies { OP_IGNORE, OP_SKIP, @@ -447,6 +450,20 @@ ReportSearchPath(void) static int neon_pgstat_file_size_limit; #endif +static void DatabricksSqlErrorHookImpl(ErrorData *edata) { + if (prev_emit_log_hook != NULL) { + prev_emit_log_hook(edata); + } + + if (edata->sqlerrcode == ERRCODE_DATA_CORRUPTED) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->data_corruption_count, 1); + } else if (edata->sqlerrcode == ERRCODE_INDEX_CORRUPTED) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->index_corruption_count, 1); + } else if (edata->sqlerrcode == ERRCODE_INTERNAL_ERROR) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->internal_error_count, 1); + } +} + void _PG_init(void) { @@ -468,6 +485,11 @@ _PG_init(void) 0, NULL, NULL, NULL); + if (lakebase_mode) { + prev_emit_log_hook = emit_log_hook; + emit_log_hook = DatabricksSqlErrorHookImpl; + } + /* * Initializing a pre-loaded Postgres extension happens in three stages: * @@ -608,6 +630,19 @@ _PG_init(void) 0, NULL, NULL, NULL); + // A test hook used in sql regress to trigger specific behaviors + // to test features easily. + DefineCustomIntVariable( + "databricks.test_hook", + "The test hook used in sql regress tests only", + NULL, + &databricks_test_hook, + 0, + 0, INT32_MAX, + PGC_SUSET, + 0, + NULL, NULL, NULL); + /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the @@ -842,6 +877,9 @@ neon_shmem_startup_hook(void) LfcShmemInit(); PrewarmShmemInit(); NeonPerfCountersShmemInit(); + if (lakebase_mode) { + DatabricksMetricsShmemInit(); + } PagestoreShmemInit(); RelsizeCacheShmemInit(); WalproposerShmemInit(); diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index dd576e4e73..a38f876a0c 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -19,7 +19,35 @@ #include "neon.h" #include "neon_perf_counters.h" -#include "neon_pgversioncompat.h" +#include "walproposer.h" + +/* BEGIN_HADRON */ +databricks_metrics *databricks_metrics_shared; + +Size +DatabricksMetricsShmemSize(void) +{ + return sizeof(databricks_metrics); +} + +void +DatabricksMetricsShmemInit(void) +{ + bool found; + + databricks_metrics_shared = + ShmemInitStruct("Databricks counters", + DatabricksMetricsShmemSize(), + &found); + Assert(found == IsUnderPostmaster); + if (!found) + { + pg_atomic_init_u32(&databricks_metrics_shared->index_corruption_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->data_corruption_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->internal_error_count, 0); + } +} +/* END_HADRON */ neon_per_backend_counters *neon_per_backend_counters_shared; @@ -38,11 +66,12 @@ NeonPerfCountersShmemRequest(void) #else size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters)); #endif + if (lakebase_mode) { + size = add_size(size, DatabricksMetricsShmemSize()); + } RequestAddinShmemSpace(size); } - - void NeonPerfCountersShmemInit(void) { @@ -395,6 +424,33 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) metric_to_datums(&metrics[i], &values[0], &nulls[0]); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } + + if (lakebase_mode) { + + if (databricks_test_hook == TestHookCorruption) { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("test corruption"))); + } + + // Not ideal but piggyback our databricks counters into the neon perf counters view + // so that we don't need to introduce neon--1.x+1.sql to add a new view. + { + metric_t databricks_metrics[] = { + {"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)}, + {"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)}, + {"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)}, + {NULL, false, 0, 0}, + }; + for (int i = 0; databricks_metrics[i].name != NULL; i++) + { + metric_to_datums(&databricks_metrics[i], &values[0], &nulls[0]); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + } + /* END_HADRON */ + } + pfree(metrics); return (Datum) 0; diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index bc4efddee5..0196559806 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -177,5 +177,23 @@ extern void inc_query_time(uint64 elapsed); extern Size NeonPerfCountersShmemSize(void); extern void NeonPerfCountersShmemInit(void); +/* BEGIN_HADRON */ +typedef struct +{ + pg_atomic_uint32 index_corruption_count; + pg_atomic_uint32 data_corruption_count; + pg_atomic_uint32 internal_error_count; +} databricks_metrics; + +extern databricks_metrics *databricks_metrics_shared; + +extern Size DatabricksMetricsShmemSize(void); +extern void DatabricksMetricsShmemInit(void); + +extern int databricks_test_hook; + +static const int TestHookCorruption = 1; +/* END_HADRON */ + #endif /* NEON_PERF_COUNTERS_H */ diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index ca784423ee..43cfe70206 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -8,6 +8,7 @@ use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use postgres_client::config::{AuthKeys, ChannelBinding, SslMode}; use postgres_client::connect_raw::StartupStream; +use postgres_client::error::SqlState; use postgres_client::maybe_tls_stream::MaybeTlsStream; use postgres_client::tls::MakeTlsConnect; use thiserror::Error; @@ -22,7 +23,7 @@ use crate::context::RequestContext; use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::WakeComputeError; use crate::control_plane::messages::MetricsAuxInfo; -use crate::error::{ReportableError, UserFacingError}; +use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::metrics::{Metrics, NumDbConnectionsGuard}; use crate::pqproto::StartupMessageParams; use crate::proxy::connect_compute::TlsNegotiation; @@ -65,12 +66,13 @@ impl UserFacingError for PostgresError { } impl ReportableError for PostgresError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { - PostgresError::Postgres(e) if e.as_db_error().is_some() => { - crate::error::ErrorKind::Postgres - } - PostgresError::Postgres(_) => crate::error::ErrorKind::Compute, + PostgresError::Postgres(err) => match err.as_db_error() { + Some(err) if err.code() == &SqlState::INVALID_CATALOG_NAME => ErrorKind::User, + Some(_) => ErrorKind::Postgres, + None => ErrorKind::Compute, + }, } } } @@ -110,9 +112,9 @@ impl UserFacingError for ConnectionError { } impl ReportableError for ConnectionError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { - ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute, + ConnectionError::TlsError(_) => ErrorKind::Compute, ConnectionError::WakeComputeError(e) => e.get_error_kind(), ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(), #[cfg(test)] diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index eb879f98e7..511bdc4e42 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -4,6 +4,7 @@ use std::time::Duration; use ed25519_dalek::SigningKey; use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use jose_jwk::jose_b64; +use postgres_client::error::SqlState; use postgres_client::maybe_tls_stream::MaybeTlsStream; use rand_core::OsRng; use tracing::field::display; @@ -459,15 +460,14 @@ impl ReportableError for HttpConnError { match self { HttpConnError::ConnectError(_) => ErrorKind::Compute, HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute, - HttpConnError::PostgresConnectionError(p) => { - if p.as_db_error().is_some() { - // postgres rejected the connection - ErrorKind::Postgres - } else { - // couldn't even reach postgres - ErrorKind::Compute - } - } + HttpConnError::PostgresConnectionError(p) => match p.as_db_error() { + // user provided a wrong database name + Some(err) if err.code() == &SqlState::INVALID_CATALOG_NAME => ErrorKind::User, + // postgres rejected the connection + Some(_) => ErrorKind::Postgres, + // couldn't even reach postgres + None => ErrorKind::Compute, + }, HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute, HttpConnError::ComputeCtl(_) => ErrorKind::Service, HttpConnError::JwtPayloadError(_) => ErrorKind::User, diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 26f65379e7..c334e820d7 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -192,34 +192,29 @@ pub(crate) async fn handle( let line = get(db_error, |db| db.line().map(|l| l.to_string())); let routine = get(db_error, |db| db.routine()); - match &e { - SqlOverHttpError::Postgres(e) - if e.as_db_error().is_some() && error_kind == ErrorKind::User => - { - // this error contains too much info, and it's not an error we care about. - if tracing::enabled!(Level::DEBUG) { - tracing::debug!( - kind=error_kind.to_metric_label(), - error=%e, - msg=message, - "forwarding error to user" - ); - } else { - tracing::info!( - kind = error_kind.to_metric_label(), - error = "bad query", - "forwarding error to user" - ); - } - } - _ => { - tracing::info!( + if db_error.is_some() && error_kind == ErrorKind::User { + // this error contains too much info, and it's not an error we care about. + if tracing::enabled!(Level::DEBUG) { + debug!( kind=error_kind.to_metric_label(), error=%e, msg=message, "forwarding error to user" ); + } else { + info!( + kind = error_kind.to_metric_label(), + error = "bad query", + "forwarding error to user" + ); } + } else { + info!( + kind=error_kind.to_metric_label(), + error=%e, + msg=message, + "forwarding error to user" + ); } json_response( diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 03c8f7e84a..191f8aacf1 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -12,7 +12,7 @@ use futures::stream::{self, FuturesOrdered}; use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; use remote_storage::{ - DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata, + DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata, }; use safekeeper_api::models::PeerInfo; use tokio::fs::File; @@ -607,6 +607,9 @@ pub(crate) async fn copy_partial_segment( storage.copy_object(source, destination, &cancel).await } +const WAL_READ_WARN_THRESHOLD: u32 = 2; +const WAL_READ_MAX_RETRIES: u32 = 3; + pub async fn read_object( storage: &GenericRemoteStorage, file_path: &RemotePath, @@ -620,12 +623,23 @@ pub async fn read_object( byte_start: std::ops::Bound::Included(offset), ..Default::default() }; - let download = storage - .download(file_path, &opts, &cancel) - .await - .with_context(|| { - format!("Failed to open WAL segment download stream for remote path {file_path:?}") - })?; + + // This retry only solves the connect errors: subsequent reads can still fail as this function returns + // a stream. + let download = backoff::retry( + || async { storage.download(file_path, &opts, &cancel).await }, + DownloadError::is_permanent, + WAL_READ_WARN_THRESHOLD, + WAL_READ_MAX_RETRIES, + "download WAL segment", + &cancel, + ) + .await + .ok_or_else(|| DownloadError::Cancelled) + .and_then(|x| x) + .with_context(|| { + format!("Failed to open WAL segment download stream for remote path {file_path:?}") + })?; let reader = tokio_util::io::StreamReader::new(download.download_stream); diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 5612236250..e151b0ba13 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -129,7 +129,10 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): Test static endpoint is protected from GC by acquiring and renewing lsn leases. """ - LSN_LEASE_LENGTH = 8 + LSN_LEASE_LENGTH = ( + 14 # This value needs to be large enough for compute_ctl to send two lease requests. + ) + neon_env_builder.num_pageservers = 2 # GC is manual triggered. env = neon_env_builder.init_start( @@ -230,6 +233,15 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): log.info(f"`SELECT` query succeed after GC, {ctx=}") return offset + # It's not reliable to let the compute renew the lease in this test case as we have a very tight + # lease timeout. Therefore, the test case itself will renew the lease. + # + # This is a workaround to make the test case more deterministic. + def renew_lease(env: NeonEnv, lease_lsn: Lsn): + env.storage_controller.pageserver_api().timeline_lsn_lease( + env.initial_tenant, env.initial_timeline, lease_lsn + ) + # Insert some records on main branch with env.endpoints.create_start("main", config_lines=["shared_buffers=1MB"]) as ep_main: with ep_main.cursor() as cur: @@ -242,6 +254,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): XLOG_BLCKSZ = 8192 lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ) + # We need to mock the way cplane works: it gets a lease for a branch before starting the compute. + renew_lease(env, lsn) + with env.endpoints.create_start( branch_name="main", endpoint_id="static", @@ -251,9 +266,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): cur.execute("SELECT count(*) FROM t0") assert cur.fetchone() == (ROW_COUNT,) - # Wait for static compute to renew lease at least once. - time.sleep(LSN_LEASE_LENGTH / 2) - generate_updates_on_main(env, ep_main, 3, end=100) offset = trigger_gc_and_select( @@ -263,10 +275,10 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): # Trigger Pageserver restarts for ps in env.pageservers: ps.stop() - # Static compute should have at least one lease request failure due to connection. - time.sleep(LSN_LEASE_LENGTH / 2) ps.start() + renew_lease(env, lsn) + trigger_gc_and_select( env, ep_static, @@ -282,6 +294,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): ) env.storage_controller.reconcile_until_idle() + # Wait for static compute to renew lease on the new pageserver. + time.sleep(LSN_LEASE_LENGTH + 3) + trigger_gc_and_select( env, ep_static, @@ -292,7 +307,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): # Do some update so we can increment gc_cutoff generate_updates_on_main(env, ep_main, i, end=100) - # Wait for the existing lease to expire. time.sleep(LSN_LEASE_LENGTH + 1) # Now trigger GC again, layers should be removed.