Merge pull request #7654 from neondatabase/rc/proxy/2024-05-08

Proxy release 2024-05-08
This commit is contained in:
Conrad Ludgate
2024-05-08 11:56:20 +01:00
committed by GitHub
9 changed files with 102 additions and 74 deletions

View File

@@ -382,7 +382,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
// Initialize pageserver, create initial tenant and timeline.
for ps_conf in &env.pageservers {
PageServerNode::from_env(&env, ps_conf)
.initialize(&pageserver_config)
.initialize(pageserver_config.clone())
.unwrap_or_else(|e| {
eprintln!("pageserver init failed: {e:?}");
exit(1);

View File

@@ -562,6 +562,10 @@ impl LocalEnv {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
}
for ps in &self.pageservers {
fs::create_dir(self.pageserver_data_dir(ps.id))?;
}
self.persist_config(base_path)
}

View File

@@ -10,7 +10,7 @@ use std::io;
use std::io::Write;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{bail, Context};
@@ -74,10 +74,12 @@ impl PageServerNode {
}
}
/// Merge overrides provided by the user on the command line with our default overides derived from neon_local configuration.
///
/// These all end up on the command line of the `pageserver` binary.
fn neon_local_overrides(&self, cli_overrides: &toml_edit::Document) -> Vec<String> {
fn pageserver_init_make_toml(
&self,
cli_overrides: toml_edit::Document,
) -> anyhow::Result<toml_edit::Document> {
// TODO: this is a legacy code, it should be refactored to use toml_edit directly.
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param = format!(
"pg_distrib_dir='{}'",
@@ -172,12 +174,21 @@ impl PageServerNode {
// Apply the user-provided overrides
overrides.push(cli_overrides.to_string());
overrides
// Turn `overrides` into a toml document.
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.
let mut config_toml = toml_edit::Document::new();
for fragment_str in overrides {
let fragment = toml_edit::Document::from_str(&fragment_str)
.expect("all fragments in `overrides` are valid toml documents, this function controls that");
for (key, item) in fragment.iter() {
config_toml.insert(key, item.clone());
}
}
Ok(config_toml)
}
/// Initializes a pageserver node by creating its config with the overrides provided.
pub fn initialize(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> {
// First, run `pageserver --init` and wait for it to write a config into FS and exit.
pub fn initialize(&self, config_overrides: toml_edit::Document) -> anyhow::Result<()> {
self.pageserver_init(config_overrides)
.with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id))
}
@@ -198,7 +209,7 @@ impl PageServerNode {
self.start_node().await
}
fn pageserver_init(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> {
fn pageserver_init(&self, cli_overrides: toml_edit::Document) -> anyhow::Result<()> {
let datadir = self.repo_path();
let node_id = self.conf.id;
println!(
@@ -209,36 +220,20 @@ impl PageServerNode {
);
io::stdout().flush()?;
if !datadir.exists() {
std::fs::create_dir(&datadir)?;
}
let datadir_path_str = datadir.to_str().with_context(|| {
format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
})?;
// `pageserver --init` merges the `--config-override`s into a built-in default config,
// then writes out the merged product to `pageserver.toml`.
// TODO: just write the full `pageserver.toml` and get rid of `--config-override`.
let mut args = vec!["--init", "--workdir", datadir_path_str];
let overrides = self.neon_local_overrides(config_overrides);
for piece in &overrides {
args.push("--config-override");
args.push(piece);
}
let init_output = Command::new(self.env.pageserver_bin())
.args(args)
.envs(self.pageserver_env_variables()?)
.output()
.with_context(|| format!("Failed to run pageserver init for node {node_id}"))?;
anyhow::ensure!(
init_output.status.success(),
"Pageserver init for node {} did not finish successfully, stdout: {}, stderr: {}",
node_id,
String::from_utf8_lossy(&init_output.stdout),
String::from_utf8_lossy(&init_output.stderr),
);
let config = self
.pageserver_init_make_toml(cli_overrides)
.context("make pageserver toml")?;
let config_file_path = datadir.join("pageserver.toml");
let mut config_file = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&config_file_path)
.with_context(|| format!("open pageserver toml for write: {config_file_path:?}"))?;
config_file
.write_all(config.to_string().as_bytes())
.context("write pageserver toml")?;
drop(config_file);
// TODO: invoke a TBD config-check command to validate that pageserver will start with the written config
// Write metadata file, used by pageserver on startup to register itself with
// the storage controller

View File

@@ -1,7 +1,7 @@
use crate::{
auth::parse_endpoint_param,
cancellation::CancelClosure,
console::{errors::WakeComputeError, messages::MetricsAuxInfo},
console::{errors::WakeComputeError, messages::MetricsAuxInfo, provider::ApiLockError},
context::RequestMonitoring,
error::{ReportableError, UserFacingError},
metrics::{Metrics, NumDbConnectionsGuard},
@@ -34,6 +34,9 @@ pub enum ConnectionError {
#[error("{COULD_NOT_CONNECT}: {0}")]
WakeComputeError(#[from] WakeComputeError),
#[error("error acquiring resource permit: {0}")]
TooManyConnectionAttempts(#[from] ApiLockError),
}
impl UserFacingError for ConnectionError {
@@ -57,6 +60,9 @@ impl UserFacingError for ConnectionError {
None => err.to_string(),
},
WakeComputeError(err) => err.to_string_client(),
TooManyConnectionAttempts(_) => {
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
}
_ => COULD_NOT_CONNECT.to_owned(),
}
}
@@ -72,6 +78,7 @@ impl ReportableError for ConnectionError {
ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute,
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(),
}
}
}

View File

@@ -12,6 +12,7 @@ use crate::{
compute,
config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions},
context::RequestMonitoring,
error::ReportableError,
intern::ProjectIdInt,
metrics::ApiLockMetrics,
scram, EndpointCacheKey,
@@ -30,6 +31,8 @@ pub mod errors {
};
use thiserror::Error;
use super::ApiLockError;
/// A go-to error message which doesn't leak any detail.
const REQUEST_FAILED: &str = "Console request failed";
@@ -211,8 +214,8 @@ pub mod errors {
#[error("Too many connections attempts")]
TooManyConnections,
#[error("Timeout waiting to acquire wake compute lock")]
TimeoutError,
#[error("error acquiring resource permit: {0}")]
TooManyConnectionAttempts(#[from] ApiLockError),
}
// This allows more useful interactions than `#[from]`.
@@ -222,17 +225,6 @@ pub mod errors {
}
}
impl From<tokio::sync::AcquireError> for WakeComputeError {
fn from(_: tokio::sync::AcquireError) -> Self {
WakeComputeError::TimeoutError
}
}
impl From<tokio::time::error::Elapsed> for WakeComputeError {
fn from(_: tokio::time::error::Elapsed) -> Self {
WakeComputeError::TimeoutError
}
}
impl UserFacingError for WakeComputeError {
fn to_string_client(&self) -> String {
use WakeComputeError::*;
@@ -245,7 +237,9 @@ pub mod errors {
TooManyConnections => self.to_string(),
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
TooManyConnectionAttempts(_) => {
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
}
}
}
}
@@ -256,7 +250,7 @@ pub mod errors {
WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
WakeComputeError::ApiError(e) => e.get_error_kind(),
WakeComputeError::TooManyConnections => crate::error::ErrorKind::RateLimit,
WakeComputeError::TimeoutError => crate::error::ErrorKind::ServiceRateLimit,
WakeComputeError::TooManyConnectionAttempts(e) => e.get_error_kind(),
}
}
}
@@ -456,6 +450,23 @@ pub struct ApiLocks<K> {
metrics: &'static ApiLockMetrics,
}
#[derive(Debug, thiserror::Error)]
pub enum ApiLockError {
#[error("lock was closed")]
AcquireError(#[from] tokio::sync::AcquireError),
#[error("permit could not be acquired")]
TimeoutError(#[from] tokio::time::error::Elapsed),
}
impl ReportableError for ApiLockError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiLockError::AcquireError(_) => crate::error::ErrorKind::Service,
ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit,
}
}
}
impl<K: Hash + Eq + Clone> ApiLocks<K> {
pub fn new(
name: &'static str,
@@ -475,7 +486,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
})
}
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, errors::WakeComputeError> {
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
if self.permits == 0 {
return Ok(WakeComputePermit { permit: None });
}

View File

@@ -86,6 +86,8 @@ impl ShouldRetry for compute::ConnectionError {
match self {
compute::ConnectionError::Postgres(err) => err.should_retry_database_address(),
compute::ConnectionError::CouldNotConnect(err) => err.should_retry_database_address(),
// the cache entry was not checked for validity
compute::ConnectionError::TooManyConnectionAttempts(_) => false,
_ => true,
}
}

View File

@@ -119,7 +119,7 @@ fn report_error(e: &WakeComputeError, retry: bool) {
WakeupFailureKind::ApiConsoleOtherError
}
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TimeoutError => WakeupFailureKind::TimeoutError,
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
};
Metrics::get()
.proxy

View File

@@ -10,6 +10,7 @@ use crate::{
console::{
errors::{GetAuthInfoError, WakeComputeError},
locks::ApiLocks,
provider::ApiLockError,
CachedNodeInfo,
},
context::RequestMonitoring,
@@ -131,6 +132,8 @@ pub enum HttpConnError {
AuthError(#[from] AuthError),
#[error("wake_compute returned error")]
WakeCompute(#[from] WakeComputeError),
#[error("error acquiring resource permit: {0}")]
TooManyConnectionAttempts(#[from] ApiLockError),
}
impl ReportableError for HttpConnError {
@@ -141,6 +144,7 @@ impl ReportableError for HttpConnError {
HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
HttpConnError::AuthError(a) => a.get_error_kind(),
HttpConnError::WakeCompute(w) => w.get_error_kind(),
HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
}
}
}
@@ -153,6 +157,9 @@ impl UserFacingError for HttpConnError {
HttpConnError::GetAuthInfo(c) => c.to_string_client(),
HttpConnError::AuthError(c) => c.to_string_client(),
HttpConnError::WakeCompute(c) => c.to_string_client(),
HttpConnError::TooManyConnectionAttempts(_) => {
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
}
}
}
}
@@ -165,6 +172,15 @@ impl ShouldRetry for HttpConnError {
HttpConnError::GetAuthInfo(_) => false,
HttpConnError::AuthError(_) => false,
HttpConnError::WakeCompute(_) => false,
HttpConnError::TooManyConnectionAttempts(_) => false,
}
}
fn should_retry_database_address(&self) -> bool {
match self {
HttpConnError::ConnectionError(e) => e.should_retry_database_address(),
// we never checked cache validity
HttpConnError::TooManyConnectionAttempts(_) => false,
_ => true,
}
}
}

View File

@@ -48,6 +48,12 @@ class Branchpoint(str, enum.Enum):
]
SHUTDOWN_ALLOWED_ERRORS = [
".*initial size calculation failed: downloading failed, possibly for shutdown",
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
]
@pytest.mark.parametrize("branchpoint", Branchpoint.all())
@pytest.mark.parametrize("restart_after", [True, False])
def test_ancestor_detach_branched_from(
@@ -61,12 +67,7 @@ def test_ancestor_detach_branched_from(
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*initial size calculation failed: downloading failed, possibly for shutdown",
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
]
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
@@ -208,13 +209,7 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, res
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*initial size calculation failed: downloading failed, possibly for shutdown",
# after restart this is likely to happen if there is other load on the runner
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
]
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
@@ -396,9 +391,7 @@ def test_detached_receives_flushes_while_being_detached(
with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
env.pageserver.allowed_errors.append(
"initial size calculation failed: downloading failed, possibly for shutdown"
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
# TODO: