diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 3f09042d9d..179a756135 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -382,7 +382,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { // Initialize pageserver, create initial tenant and timeline. for ps_conf in &env.pageservers { PageServerNode::from_env(&env, ps_conf) - .initialize(&pageserver_config) + .initialize(pageserver_config.clone()) .unwrap_or_else(|e| { eprintln!("pageserver init failed: {e:?}"); exit(1); diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 6437d04ec8..7abbbce95a 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -562,6 +562,10 @@ impl LocalEnv { fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?; } + for ps in &self.pageservers { + fs::create_dir(self.pageserver_data_dir(ps.id))?; + } + self.persist_config(base_path) } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 2179859023..6046c93bad 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -10,7 +10,7 @@ use std::io; use std::io::Write; use std::num::NonZeroU64; use std::path::PathBuf; -use std::process::Command; +use std::str::FromStr; use std::time::Duration; use anyhow::{bail, Context}; @@ -74,10 +74,12 @@ impl PageServerNode { } } - /// Merge overrides provided by the user on the command line with our default overides derived from neon_local configuration. - /// - /// These all end up on the command line of the `pageserver` binary. - fn neon_local_overrides(&self, cli_overrides: &toml_edit::Document) -> Vec { + fn pageserver_init_make_toml( + &self, + cli_overrides: toml_edit::Document, + ) -> anyhow::Result { + // TODO: this is a legacy code, it should be refactored to use toml_edit directly. + // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. let pg_distrib_dir_param = format!( "pg_distrib_dir='{}'", @@ -172,12 +174,21 @@ impl PageServerNode { // Apply the user-provided overrides overrides.push(cli_overrides.to_string()); - overrides + // Turn `overrides` into a toml document. + // TODO: above code is legacy code, it should be refactored to use toml_edit directly. + let mut config_toml = toml_edit::Document::new(); + for fragment_str in overrides { + let fragment = toml_edit::Document::from_str(&fragment_str) + .expect("all fragments in `overrides` are valid toml documents, this function controls that"); + for (key, item) in fragment.iter() { + config_toml.insert(key, item.clone()); + } + } + Ok(config_toml) } /// Initializes a pageserver node by creating its config with the overrides provided. - pub fn initialize(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> { - // First, run `pageserver --init` and wait for it to write a config into FS and exit. + pub fn initialize(&self, config_overrides: toml_edit::Document) -> anyhow::Result<()> { self.pageserver_init(config_overrides) .with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id)) } @@ -198,7 +209,7 @@ impl PageServerNode { self.start_node().await } - fn pageserver_init(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> { + fn pageserver_init(&self, cli_overrides: toml_edit::Document) -> anyhow::Result<()> { let datadir = self.repo_path(); let node_id = self.conf.id; println!( @@ -209,36 +220,20 @@ impl PageServerNode { ); io::stdout().flush()?; - if !datadir.exists() { - std::fs::create_dir(&datadir)?; - } - - let datadir_path_str = datadir.to_str().with_context(|| { - format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}") - })?; - - // `pageserver --init` merges the `--config-override`s into a built-in default config, - // then writes out the merged product to `pageserver.toml`. - // TODO: just write the full `pageserver.toml` and get rid of `--config-override`. - let mut args = vec!["--init", "--workdir", datadir_path_str]; - let overrides = self.neon_local_overrides(config_overrides); - for piece in &overrides { - args.push("--config-override"); - args.push(piece); - } - let init_output = Command::new(self.env.pageserver_bin()) - .args(args) - .envs(self.pageserver_env_variables()?) - .output() - .with_context(|| format!("Failed to run pageserver init for node {node_id}"))?; - - anyhow::ensure!( - init_output.status.success(), - "Pageserver init for node {} did not finish successfully, stdout: {}, stderr: {}", - node_id, - String::from_utf8_lossy(&init_output.stdout), - String::from_utf8_lossy(&init_output.stderr), - ); + let config = self + .pageserver_init_make_toml(cli_overrides) + .context("make pageserver toml")?; + let config_file_path = datadir.join("pageserver.toml"); + let mut config_file = std::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&config_file_path) + .with_context(|| format!("open pageserver toml for write: {config_file_path:?}"))?; + config_file + .write_all(config.to_string().as_bytes()) + .context("write pageserver toml")?; + drop(config_file); + // TODO: invoke a TBD config-check command to validate that pageserver will start with the written config // Write metadata file, used by pageserver on startup to register itself with // the storage controller diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 23266ac4ef..4433b3c1c2 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -1,7 +1,7 @@ use crate::{ auth::parse_endpoint_param, cancellation::CancelClosure, - console::{errors::WakeComputeError, messages::MetricsAuxInfo}, + console::{errors::WakeComputeError, messages::MetricsAuxInfo, provider::ApiLockError}, context::RequestMonitoring, error::{ReportableError, UserFacingError}, metrics::{Metrics, NumDbConnectionsGuard}, @@ -34,6 +34,9 @@ pub enum ConnectionError { #[error("{COULD_NOT_CONNECT}: {0}")] WakeComputeError(#[from] WakeComputeError), + + #[error("error acquiring resource permit: {0}")] + TooManyConnectionAttempts(#[from] ApiLockError), } impl UserFacingError for ConnectionError { @@ -57,6 +60,9 @@ impl UserFacingError for ConnectionError { None => err.to_string(), }, WakeComputeError(err) => err.to_string_client(), + TooManyConnectionAttempts(_) => { + "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned() + } _ => COULD_NOT_CONNECT.to_owned(), } } @@ -72,6 +78,7 @@ impl ReportableError for ConnectionError { ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute, ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute, ConnectionError::WakeComputeError(e) => e.get_error_kind(), + ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(), } } } diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index a05cf248f6..3b996cdbd1 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -12,6 +12,7 @@ use crate::{ compute, config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions}, context::RequestMonitoring, + error::ReportableError, intern::ProjectIdInt, metrics::ApiLockMetrics, scram, EndpointCacheKey, @@ -30,6 +31,8 @@ pub mod errors { }; use thiserror::Error; + use super::ApiLockError; + /// A go-to error message which doesn't leak any detail. const REQUEST_FAILED: &str = "Console request failed"; @@ -211,8 +214,8 @@ pub mod errors { #[error("Too many connections attempts")] TooManyConnections, - #[error("Timeout waiting to acquire wake compute lock")] - TimeoutError, + #[error("error acquiring resource permit: {0}")] + TooManyConnectionAttempts(#[from] ApiLockError), } // This allows more useful interactions than `#[from]`. @@ -222,17 +225,6 @@ pub mod errors { } } - impl From for WakeComputeError { - fn from(_: tokio::sync::AcquireError) -> Self { - WakeComputeError::TimeoutError - } - } - impl From for WakeComputeError { - fn from(_: tokio::time::error::Elapsed) -> Self { - WakeComputeError::TimeoutError - } - } - impl UserFacingError for WakeComputeError { fn to_string_client(&self) -> String { use WakeComputeError::*; @@ -245,7 +237,9 @@ pub mod errors { TooManyConnections => self.to_string(), - TimeoutError => "timeout while acquiring the compute resource lock".to_owned(), + TooManyConnectionAttempts(_) => { + "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned() + } } } } @@ -256,7 +250,7 @@ pub mod errors { WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane, WakeComputeError::ApiError(e) => e.get_error_kind(), WakeComputeError::TooManyConnections => crate::error::ErrorKind::RateLimit, - WakeComputeError::TimeoutError => crate::error::ErrorKind::ServiceRateLimit, + WakeComputeError::TooManyConnectionAttempts(e) => e.get_error_kind(), } } } @@ -456,6 +450,23 @@ pub struct ApiLocks { metrics: &'static ApiLockMetrics, } +#[derive(Debug, thiserror::Error)] +pub enum ApiLockError { + #[error("lock was closed")] + AcquireError(#[from] tokio::sync::AcquireError), + #[error("permit could not be acquired")] + TimeoutError(#[from] tokio::time::error::Elapsed), +} + +impl ReportableError for ApiLockError { + fn get_error_kind(&self) -> crate::error::ErrorKind { + match self { + ApiLockError::AcquireError(_) => crate::error::ErrorKind::Service, + ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit, + } + } +} + impl ApiLocks { pub fn new( name: &'static str, @@ -475,7 +486,7 @@ impl ApiLocks { }) } - pub async fn get_permit(&self, key: &K) -> Result { + pub async fn get_permit(&self, key: &K) -> Result { if self.permits == 0 { return Ok(WakeComputePermit { permit: None }); } diff --git a/proxy/src/proxy/retry.rs b/proxy/src/proxy/retry.rs index 36a05ba190..8dec1f1137 100644 --- a/proxy/src/proxy/retry.rs +++ b/proxy/src/proxy/retry.rs @@ -86,6 +86,8 @@ impl ShouldRetry for compute::ConnectionError { match self { compute::ConnectionError::Postgres(err) => err.should_retry_database_address(), compute::ConnectionError::CouldNotConnect(err) => err.should_retry_database_address(), + // the cache entry was not checked for validity + compute::ConnectionError::TooManyConnectionAttempts(_) => false, _ => true, } } diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index 3d9e94dd72..94b03e1ccc 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -119,7 +119,7 @@ fn report_error(e: &WakeComputeError, retry: bool) { WakeupFailureKind::ApiConsoleOtherError } WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked, - WakeComputeError::TimeoutError => WakeupFailureKind::TimeoutError, + WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError, }; Metrics::get() .proxy diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 963913a260..ce58f575e2 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -10,6 +10,7 @@ use crate::{ console::{ errors::{GetAuthInfoError, WakeComputeError}, locks::ApiLocks, + provider::ApiLockError, CachedNodeInfo, }, context::RequestMonitoring, @@ -131,6 +132,8 @@ pub enum HttpConnError { AuthError(#[from] AuthError), #[error("wake_compute returned error")] WakeCompute(#[from] WakeComputeError), + #[error("error acquiring resource permit: {0}")] + TooManyConnectionAttempts(#[from] ApiLockError), } impl ReportableError for HttpConnError { @@ -141,6 +144,7 @@ impl ReportableError for HttpConnError { HttpConnError::GetAuthInfo(a) => a.get_error_kind(), HttpConnError::AuthError(a) => a.get_error_kind(), HttpConnError::WakeCompute(w) => w.get_error_kind(), + HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(), } } } @@ -153,6 +157,9 @@ impl UserFacingError for HttpConnError { HttpConnError::GetAuthInfo(c) => c.to_string_client(), HttpConnError::AuthError(c) => c.to_string_client(), HttpConnError::WakeCompute(c) => c.to_string_client(), + HttpConnError::TooManyConnectionAttempts(_) => { + "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned() + } } } } @@ -165,6 +172,15 @@ impl ShouldRetry for HttpConnError { HttpConnError::GetAuthInfo(_) => false, HttpConnError::AuthError(_) => false, HttpConnError::WakeCompute(_) => false, + HttpConnError::TooManyConnectionAttempts(_) => false, + } + } + fn should_retry_database_address(&self) -> bool { + match self { + HttpConnError::ConnectionError(e) => e.should_retry_database_address(), + // we never checked cache validity + HttpConnError::TooManyConnectionAttempts(_) => false, + _ => true, } } } diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index 5abb3e28e4..b8a88ca6df 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -48,6 +48,12 @@ class Branchpoint(str, enum.Enum): ] +SHUTDOWN_ALLOWED_ERRORS = [ + ".*initial size calculation failed: downloading failed, possibly for shutdown", + ".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited", +] + + @pytest.mark.parametrize("branchpoint", Branchpoint.all()) @pytest.mark.parametrize("restart_after", [True, False]) def test_ancestor_detach_branched_from( @@ -61,12 +67,7 @@ def test_ancestor_detach_branched_from( env = neon_env_builder.init_start() - env.pageserver.allowed_errors.extend( - [ - ".*initial size calculation failed: downloading failed, possibly for shutdown", - ".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited", - ] - ) + env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) client = env.pageserver.http_client() @@ -208,13 +209,7 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, res env = neon_env_builder.init_start() - env.pageserver.allowed_errors.extend( - [ - ".*initial size calculation failed: downloading failed, possibly for shutdown", - # after restart this is likely to happen if there is other load on the runner - ".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited", - ] - ) + env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) client = env.pageserver.http_client() @@ -396,9 +391,7 @@ def test_detached_receives_flushes_while_being_detached( with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep: assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows - env.pageserver.allowed_errors.append( - "initial size calculation failed: downloading failed, possibly for shutdown" - ) + env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) # TODO: