Compare commits

..

8 Commits

Author SHA1 Message Date
Folke Behrens
a7946dffec neon-image: Add debugging tools to image 2025-03-24 19:52:27 +01:00
Anastasia Lubennikova
3e5884ff01 Revert "feat(compute_ctl): allow to change audit_log_level for existi… (#11343)
…ng (#11308)"

This reverts commit e5aef3747c.

The logic of this commit was incorrect:
enabling audit requires a restart of the compute,
because audit extensions use shared_preload_libraries.
So it cannot be done in the configuration phase,
require endpoint restart instead.
2025-03-21 18:09:34 +00:00
Vlad Lazar
9fc7c22cc9 storcon: add use_local_compute_notifications flag (#11333)
## Problem

While working on bulk import, I want to use the `control-plane-url` flag
for a different request.
Currently, the local compute hook is used whenever no control plane is
specified in the config.
My test requires local compute notifications and a configured
`control-plane-url` which isn't supported.

## Summary of changes

Add a `use-local-compute-notifications` flag. When this is set, we use
the local flow regardless of other config values.
It's enabled by default in neon_local and disabled by default in all
other envs. I had to turn the flag off in tests
that wish to bypass the local flow, but that's expected.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-03-21 15:31:06 +00:00
Folke Behrens
23ad228310 pgxn: Increase the pageserver response timeout a bit (#11339)
Increase the PS response timeout slightly but noticeably,
so it does not coincide with the default TCP_RTO_MAX.
2025-03-21 14:21:53 +00:00
Dmitrii Kovalkov
aeb53fea94 storage: support multiple SSL CA certificates (#11341)
## Problem
- We need to support multiple SSL CA certificates for graceful root CA
certificate rotation.
- Closes: https://github.com/neondatabase/cloud/issues/25971

## Summary of changes
- Parses `ssl_ca_file` as a pem bundle, which may contain multiple
certificates. Single pem cert is a valid pem bundle, so the change is
backward compatible.
2025-03-21 13:43:38 +00:00
Dmitrii Kovalkov
0f367cb665 storcon: reuse reqwest http client (#11327)
## Problem

- Part of https://github.com/neondatabase/neon/issues/11113
- Building a new `reqwest::Client` for every request is expensive
because it parses CA certs under the hood. It's noticeable in storcon's
flamegraph.

## Summary of changes
- Reuse one `reqwest::Client` for all API calls to avoid parsing CA
certificates every time.
2025-03-21 11:48:22 +00:00
John Spray
76088c16d2 storcon: reproduce shard split issue (#11290)
## Problem

Issue https://github.com/neondatabase/neon/issues/11254 describes a case
where restart during a shard split can result in a bad end state in the
database.

## Summary of changes

- Add a reproducer for the issue
- Tighten an existing safety check around updated row counts in
complete_shard_split
2025-03-21 08:48:56 +00:00
John Spray
0d99609870 docs: storage controller retro-RFC (#11218)
## Problem

Various aspects of the controller's job are already described in RFCs,
but the overall service didn't have an RFC that records design tradeoffs
and the top level structure.

## Summary of changes

- Add a retrospective RFC that should be useful for anyone understanding
storage controller functionality
2025-03-21 08:32:11 +00:00
45 changed files with 666 additions and 342 deletions

View File

@@ -103,12 +103,17 @@ RUN set -e \
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
&& apt update \
&& apt install -y \
bpftrace \
ca-certificates \
libreadline-dev \
libseccomp-dev \
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
iproute2 \
lsof \
openssl \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
screen \
tcpdump \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
@@ -153,11 +153,6 @@ pub struct ComputeState {
pub startup_span: Option<tracing::span::Span>,
pub metrics: ComputeMetrics,
/// current audit log level
/// to know if it is already configured, or we need to set up audit
/// when compute receives a new spec
pub audit_log_level: ComputeAudit,
}
impl ComputeState {
@@ -170,7 +165,6 @@ impl ComputeState {
pspec: None,
startup_span: None,
metrics: ComputeMetrics::default(),
audit_log_level: ComputeAudit::default(),
}
}
@@ -626,10 +620,16 @@ impl ComputeNode {
});
}
// If extended compute audit is enabled configure and start rsyslog
if pspec.spec.audit_log_level == ComputeAudit::Hipaa {
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
configure_audit_rsyslog(&log_directory_path, pspec.spec.audit_log_level.as_str())?;
// Configure and start rsyslog for HIPAA if necessary
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
if remote_endpoint.is_empty() {
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
}
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
// Launch a background task to clean up the audit logs
launch_pgaudit_gc(log_directory_path);
@@ -684,11 +684,6 @@ impl ComputeNode {
});
}
// after all the configuration is done
// preserve the information about the current audit log level
// so that we don't relaunch rsyslog on every spec change
self.set_audit_log_level(pspec.spec.audit_log_level);
// All done!
let startup_end_time = Utc::now();
let metrics = {
@@ -843,19 +838,6 @@ impl ComputeNode {
self.state.lock().unwrap().status
}
pub fn set_audit_log_level(&self, audit_log_level: ComputeAudit) {
let mut state = self.state.lock().unwrap();
state.audit_log_level = audit_log_level;
}
pub fn get_audit_log_level(&self) -> ComputeAudit {
self.state.lock().unwrap().audit_log_level
}
pub fn get_audit_log_dir(&self) -> PathBuf {
Path::new(&self.params.pgdata).join("log")
}
pub fn get_timeline_id(&self) -> Option<TimelineId> {
self.state
.lock()
@@ -1566,29 +1548,6 @@ impl ComputeNode {
});
}
// If extended compute audit is enabled configure and start rsyslog
// We check that the audit_log_level changed compared to the previous spec and skip this step if not.
let audit_log_level = self.get_audit_log_level();
if spec.audit_log_level == ComputeAudit::Hipaa && audit_log_level != spec.audit_log_level {
info!(
"Configuring audit logging because audit_log_level changed from {:?} to {:?}",
audit_log_level, spec.audit_log_level
);
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
configure_audit_rsyslog(&log_directory_path, spec.audit_log_level.as_str())?;
// Launch a background task to clean up the audit logs
// If rsyslog was already configured, we don't need to start this process again.
match audit_log_level {
ComputeAudit::Disabled | ComputeAudit::Log => {
launch_pgaudit_gc(log_directory_path);
}
_ => {}
}
}
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
config::write_postgres_conf(
@@ -1598,14 +1557,7 @@ impl ComputeNode {
&self.compute_ctl_config.tls,
)?;
// Override the skip_catalog_updates flag
// if we need to install new extensions
//
// Check that audit_log_level changed compared to the previous spec and skip this step if not.
// All operations are idempotent, so this is just a performance optimization.
let force_catalog_updates = audit_log_level != spec.audit_log_level;
if !spec.skip_pg_catalog_updates || force_catalog_updates {
if !spec.skip_pg_catalog_updates {
let max_concurrent_connections = spec.reconfigure_concurrency;
// Temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:
@@ -1630,11 +1582,6 @@ impl ComputeNode {
self.pg_reload_conf()?;
// after all the configuration is done
// preserve the information about the current audit log level
// so that we don't relaunch rsyslog on every spec change
self.set_audit_log_level(spec.audit_log_level);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(

View File

@@ -200,9 +200,8 @@ pub fn write_postgres_conf(
)?;
// This log level is very verbose
// but this is necessary for HIPAA compliance.
// Exclude 'misc' category, because it doesn't contain anything relevant.
// Exclude 'misc' category, because it doesn't contain anythig relevant.
writeln!(file, "pgaudit.log='all, -misc'")?;
// Log parameters for all queries
writeln!(file, "pgaudit.log_parameter=on")?;
// Disable logging of catalog queries
// The catalog doesn't contain sensitive data, so we don't need to audit it.

View File

@@ -9,7 +9,6 @@ use anyhow::{Context, Result, anyhow};
use tracing::{error, info, instrument, warn};
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
fn get_rsyslog_pid() -> Option<String> {
let output = Command::new("pgrep")
@@ -49,43 +48,32 @@ fn restart_rsyslog() -> Result<()> {
Ok(())
}
pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> {
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?;
if remote_endpoint.is_empty() {
return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set"));
}
let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) {
Ok(c) => c,
Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
Err(err) => return Err(err.into()),
};
pub fn configure_audit_rsyslog(
log_directory: String,
tag: &str,
remote_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
tag = audit_log_level,
tag = tag,
remote_endpoint = remote_endpoint
);
if old_config_content == config_content {
info!("rsyslog configuration is up-to-date");
return Ok(());
}
info!("rsyslog config_content: {}", config_content);
let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(AUDIT_LOGS_CONF_PATH)?;
.open(rsyslog_conf_path)?;
file.write_all(config_content.as_bytes())?;
info!(
"rsyslog configuration file {} added successfully. Starting rsyslogd",
AUDIT_LOGS_CONF_PATH
rsyslog_conf_path
);
// start the service, using the configuration

View File

@@ -184,6 +184,8 @@ pub struct NeonStorageControllerConf {
pub timelines_onto_safekeepers: bool,
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
}
impl NeonStorageControllerConf {
@@ -213,6 +215,7 @@ impl Default for NeonStorageControllerConf {
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}
}
}

View File

@@ -51,11 +51,19 @@ impl PageServerNode {
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
let ssl_ca_cert = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
Certificate::from_pem(&buf).expect("CA certificate should be valid")
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("Client constructs with no errors");
let endpoint = if env.storage_controller.use_https_pageserver_api {
format!(
"https://{}",
@@ -72,6 +80,7 @@ impl PageServerNode {
conf: conf.clone(),
env: env.clone(),
http_client: mgmt_api::Client::new(
http_client,
endpoint,
{
match conf.http_auth_type {
@@ -83,9 +92,7 @@ impl PageServerNode {
}
}
.as_deref(),
ssl_ca_cert,
)
.expect("Client constructs with no errors"),
),
}
}

View File

@@ -17,7 +17,7 @@ use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, Timelin
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use reqwest::{Certificate, Method};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -143,11 +143,14 @@ impl StorageController {
}
};
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_file) = env.ssl_ca_cert_path() {
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
let cert = reqwest::Certificate::from_pem(&buf).expect("SSL CA file should be valid");
http_client = http_client.add_root_certificate(cert);
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
@@ -552,6 +555,10 @@ impl StorageController {
args.push("--use-https-safekeeper-api".to_string());
}
if self.config.use_local_compute_notifications {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}

View File

@@ -20,7 +20,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use reqwest::{Certificate, Method, StatusCode, Url};
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -274,7 +274,7 @@ struct Cli {
jwt: Option<String>,
#[arg(long)]
/// Trusted root CA certificate to use in https APIs.
/// Trusted root CA certificates to use in https APIs.
ssl_ca_file: Option<PathBuf>,
#[command(subcommand)]
@@ -387,17 +387,23 @@ async fn main() -> anyhow::Result<()> {
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
let ssl_ca_cert = match &cli.ssl_ca_file {
let ssl_ca_certs = match &cli.ssl_ca_file {
Some(ssl_ca_file) => {
let buf = tokio::fs::read(ssl_ca_file).await?;
Some(reqwest::Certificate::from_pem(&buf)?)
Certificate::from_pem_bundle(&buf)?
}
None => None,
None => Vec::new(),
};
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;
let mut trimmed = cli.api.to_string();
trimmed.pop();
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?;
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
match cli.command {
Command::NodeRegister {

View File

@@ -0,0 +1,196 @@
## Summary
This is a retrospective RFC to document the design of the `storage-controller` service.
This service manages the physical mapping of Tenants and Timelines to Pageservers and Safekeepers. It
acts as the API for "storage" as an abstract concept: enabling other parts of the system to reason
about things like creating/deleting tenants and timelines without having to understand exactly which
pageserver and safekeeper to communicate, or any subtle rules about how to orchestrate these things.
The storage controller was implemented in the first half of 2024 as an essential part
of storage sharding, especially [shard splitting](032-shard-splitting.md).
It initially managed only pageservers, but has extended in 2025 to also manage safekeepers. In
some places you may seen unqualified references to 'nodes' -- those are pageservers.
## Design Choices
### Durability
We rely on an external postgres for all durable state. No local storage is used.
We avoid any unnecessary I/O to durable storage. For example:
- most tracking of in-flight changes to the system is done in-memory rather than recording progress/steps in a database
- When migrating tenant shards between pageservers we only touch the database to increment generation numbers,
we do not persist the total state of a tenant shard.
Being frugal with database I/O has two benefits:
- It avoids the database becoming a practical scaling bottleneck (we expect in-memory scale issues to be hit
before we hit e.g. transactions-per-second issues)
- It reduces cost when using a cloud database service to run the controller's postgres database.
The trade-off is that there is a "bootstrapping" problem: a controller can't be deployed in isolation, one
must first have some existing database system. In practice, we expect that Neon is deployed in one of the
following ways:
- into a cloud which has a postgres service that can be used to run the controller
- into a mature on-prem environment that has existing facilities for running databases
- into a test/dev environment where a simple one-node vanilla postgres installation is sufficient
### Consensus
The controller does _not_ implement any strong consensus mechanism of its own. Instead:
- Where strong consistency is required (for example, for pageserver generation numbers), this
responsibility is delegated to a transaction in our postgres database.
- Highly available deploys are done using a simple in-database record of what controller instances
are available, distinguished by timestamps, rather than having controllers directly negotiate a leader.
Avoiding strong consensus among controller processes is a cost saving (we avoid running three controllers
all the time), and simplifies implementation (we do not have to phrase all configuration changes as e.g raft
transactions).
The trade-off is that under some circumstances a controller with partial network isolation can cause availability
issues in the cluster, by making changes to pageserver state that might disagree with what the "true" active
controller is trying to do. The impact of this is bounded by our `controllers` database table, that enables
a rogue node to eventually realise that it is not the leader and step down. If a rogue node can't reach
the database, then it implicitly stops making progress. A rogue controller cannot durably damage the system
because pageserver data and safekeeper configs are protected by generation numbers that are only updated
via postgres transactions (i.e. no controller "trusts itself" to independently make decisions about generations).
### Scale
We design for high but not unlimited scale. The memory footprint of each tenant shard is small (~8kB), so
it is realistic to scale up to a million attached shards on a server with modest resources. Tenants in
a detached state (i.e. not active on pageservers) do not need to be managed by storage controller, and can
be relegated from memory to the database.
Typically, a tenant shard is updated about once a week, when we do a deploy. During deploys, we relocate
a few thousand tenants from each pageserver while it is restarted, so it is extremely rare for the controller
to have to do O(N) work (on all shards at once).
There are places where we do O(N) work:
- On normal startup, when loading from the database into memory
- On unclean startup (with no handover of observed state from a previous controller), where we will
scan all shards on all pageservers.
It is important that these locations are written efficiently. At high scale we should still expect runtimes
of the order tens of seconds to complete a storage controller start.
When the practical scale limit of a single storage controller is reached, just deploy another one with its
own pageservers & safekeepers: each controller+its storage servers should be thought of as a logical cluster
or "cell" of storage.
# High Level Design
The storage controller is an in-memory system (i.e. state for all attached
tenants is held in memory _as well as_ being represented in durable postgres storage).
## Infrastructure
The storage controller is an async rust binary using tokio.
The storage controller is built around the `Service` type. This implements
all the entry points for the outside world's interaction with the controller (HTTP handlers are mostly thin wrappers of service functions),
and holds most in-memory state (e.g. the list of tenant shards).
The state is held in a `ServiceInner` wrapped in a RwLock. This monolithic
lock is used to simplify reasoning about code that mutates state: each function that takes a write lock may be thought of as a serializable transaction on the in-memory state. This lock is clearly a bottleneck, but
nevertheless is scalable to managing millions of tenants.
Persistent state is held in a postgres database, and we use the `diesel` crate to provide database client functionality. All database access is wrapped in the `Persistence` type -- this makes it easy to understand which
code is touching the database. The database is only used when necessary, i.e. for state that cannot be recovered another way. For example, we do not store the secondary pageserver locations of tenant shards in the database, rather we learn these at startup from running pageservers, and/or make scheduling decisions to fill in the gaps. This adds some complexity, but massively reduces the load on the database, and enables running the storage controller with a very cheap postgres instance.
## Pageserver tenant scheduling & reconciliation
### Intent & observed state
Each tenant shard is represented by type `TenantShard`, which has an 'intent' and 'observed' state. Setting the
intent state is called _scheduling_, and doing remote I/O to make observed
state match intent state is called _reconciliation_.
The `Scheduler` type is responsible for making choices about the intent
state, such as choosing a pageserver for a new tenant shard, or assigning
a replacement pageserver when the original one fails.
The observed state is updated after tenant reconciliation (see below), and
has the concept of a `None` state for a pageserver, indicating unknown state. This is used to ensure that we can safely clean up after we start
but do not finish a remote call to a pageserver, or if a pageserver restarts and we are uncertain of its state.
### Tenant Reconciliation
The `Reconciler` type is responsible for updating pageservers to achieve
the intent state. It is instantiated when `Service` determines that a shard requires reconciliation, and owned by a background tokio task that
runs it to completion. Reconciler does not have access to the `Service` state: it is populated with a snapshot of relevant information when constructed, and submits is results to a channel that `Service` consumes
to update the tenant shard's observed state.
The Reconciler does have access to the database, but only uses it for
a single purpose: updating shards' generation numbers immediately before
attaching them to a pageserver.
Operations that change a tenant's scheduling will spawn a reconciler if
necessary, and there is also a background loop which checks every shard
for the need to reconcile -- this background loop ensures eventual progress
if some earlier reconciliations failed for some reason.
The reconciler has a general purpose code path which will attach/detach from pageservers as necessary, and a special case path for live migrations. The live migration case is more common in practice, and is taken whenever the current observed state indicates that we have a healthy attached location to migrate from. This implements live migration as described in the earlier [live migration RFC](028-pageserver-migration.md).
### Scheduling optimisation
During the periodic background reconciliation loop, the controller also
performance _scheduling optimization_. This is the process of looking for
shards that are in sub-optimal locations, and moving them.
Typically, this means:
- Shards attached outside their preferred AZ (e.g. after a node failure), to migrate them back to their preferred AZ
- Shards attached on the same pageserver as some other shards in the same
tenant, to migrate them elsewhere (e.g. after a shard split)
Scheduling optimisation is a multi-step process to ensure graceful cutovers, e.g. by creating new secondary location, waiting for it to
warm up, then cutting over. This is not done as an explicit queue
of operations, but rather by iteratively calling the optimisation
function, which will recognise each intervening state as something
that can generate the next optimisation.
### Pageserver heartbeats and failure
The `Heartbeater` type is responsible for detecting when a pageserver
becomes unavailable. This is fed back into `Service` for action: when
a pageserver is marked unavailable, tenant shards on that pageserver are
rescheduled and Reconcilers are spawned to cut them over to their new location.
## Pageserver timeline CRUD operations
By CRUD operations, we mean creating and deleting timelines. The authoritative storage for which timelines exist on the pageserver
is in S3, and is governed by the pageserver's system of generation
numbers. Because a shard can be attached to multiple pageservers
concurrently, we need to handle this when doing timeline CRUD operations:
- A timeline operation is only persistent if _after_ the ack from a pageserver, that pageserver's generation is still the latest.
- For deletions in particular, they are only persistent if _all_ attached
locations have acked the deletion operation, since if only the latest one
has acked then the timeline could still return from the dead if some old-generation attachment writes an index for it.
## Zero-downtime controller deployments
When two storage controllers run at the same time, they coordinate via
the database to establish one leader, and the other controller may proxy
requests to this leader
See [Storage controller restarts RFC](037-storage-controller-restarts.md).
Note that this is not a strong consensus mechanism: the controller must also survive split-brain situations. This is respected by code that
e.g. increments version numbers, which uses database transactions that
check the expected value before modifying it. A split-brain situation can
impact availability (e.g. if two controllers are fighting over where to
attach a shard), but it should never impact durability and data integrity.
## Graceful drain & fill of pageservers during deploys
The storage controller has functionality for draining + filling pageservers
while deploying new pageserver binaries, so that clients are not actively
using a pageserver while it restarts.
See [Graceful restarts RFC](033-storage-controller-drain-and-fill.md)
## Safekeeper timeline scheduling
This is currently under development, see [Safekeeper dynamic membership change RFC](035-safekeeper-dynamic-membership-change.md).

View File

@@ -160,6 +160,12 @@ pub struct ComputeSpec {
pub drop_subscriptions_before_start: bool,
/// Log level for audit logging:
///
/// Disabled - no audit logging. This is the default.
/// log - log masked statements to the postgres log using pgaudit extension
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
///
/// Extensions should be present in shared_preload_libraries
#[serde(default)]
pub audit_log_level: ComputeAudit,
}
@@ -282,27 +288,16 @@ impl ComputeMode {
}
/// Log level for audit logging
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
/// Disabled, log, hipaa
/// Default is Disabled
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeAudit {
#[default]
/// no audit logging. This is the default.
Disabled,
/// write masked audit log statements to the postgres log using pgaudit extension
Log,
/// log unmasked statements to the file using pgaudit and pgauditlogtofile extensions
Hipaa,
}
impl ComputeAudit {
pub fn as_str(&self) -> &str {
match self {
ComputeAudit::Disabled => "disabled",
ComputeAudit::Log => "log",
ComputeAudit::Hipaa => "hipaa",
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct Cluster {
pub cluster_id: Option<String>,

View File

@@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody;
use pageserver_api::models::*;
use pageserver_api::shard::TenantShardId;
pub use reqwest::Body as ReqwestBody;
use reqwest::{Certificate, IntoUrl, Method, StatusCode, Url};
use reqwest::{IntoUrl, Method, StatusCode, Url};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -39,8 +39,8 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
#[error("request timed out: {0}")]
Timeout(String),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -72,24 +72,7 @@ pub enum ForceAwaitLogicalSize {
}
impl Client {
pub fn new(
mgmt_api_endpoint: String,
jwt: Option<&str>,
ssl_ca_cert: Option<Certificate>,
) -> Result<Self> {
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build().map_err(Error::CreateClient)?;
Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt))
}
pub fn from_client(
client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),

View File

@@ -34,10 +34,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -75,10 +75,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -123,10 +123,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -81,10 +81,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -38,10 +38,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -65,8 +65,8 @@ pub struct PageServerConf {
/// Period to reload certificate and private key from files.
/// Default: 60s.
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificate to use in https APIs.
pub ssl_ca_cert: Option<Certificate>,
/// Trusted root CA certificates to use in https APIs.
pub ssl_ca_certs: Vec<Certificate>,
/// Current availability zone. Used for traffic metrics.
pub availability_zone: Option<String>,
@@ -481,12 +481,12 @@ impl PageServerConf {
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
load_previous_heatmap: load_previous_heatmap.unwrap_or(true),
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(true),
ssl_ca_cert: match ssl_ca_file {
ssl_ca_certs: match ssl_ca_file {
Some(ssl_ca_file) => {
let buf = std::fs::read(ssl_ca_file)?;
Some(Certificate::from_pem(&buf)?)
Certificate::from_pem_bundle(&buf)?
}
None => None,
None => Vec::new(),
},
};

View File

@@ -76,7 +76,7 @@ impl StorageControllerUpcallClient {
client = client.default_headers(headers);
}
if let Some(ssl_ca_cert) = &conf.ssl_ca_cert {
for ssl_ca_cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(ssl_ca_cert.clone());
}

View File

@@ -82,7 +82,8 @@ static int max_reconnect_attempts = 60;
static int stripe_size;
static int pageserver_response_log_timeout = 10000;
static int pageserver_response_disconnect_timeout = 120000; /* 2 minutes */
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
static int pageserver_response_disconnect_timeout = 150000;
typedef struct
{
@@ -1450,7 +1451,7 @@ pg_init_libpagestore(void)
"If the pageserver doesn't respond to a request within this timeout, "
"disconnect and reconnect.",
&pageserver_response_disconnect_timeout,
120000, 100, INT_MAX,
150000, 100, INT_MAX,
PGC_SUSET,
GUC_UNIT_MS,
NULL, NULL, NULL);

View File

@@ -38,9 +38,8 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
/// Failed to create client.
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
#[error("request timed out: {0}")]
Timeout(String),
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -217,7 +217,7 @@ struct Args {
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificate to use in https APIs.
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<Utf8PathBuf>,
}
@@ -353,13 +353,13 @@ async fn main() -> anyhow::Result<()> {
}
};
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
Some(ssl_ca_file) => {
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
let buf = tokio::fs::read(ssl_ca_file).await?;
Some(Certificate::from_pem(&buf)?)
Certificate::from_pem_bundle(&buf)?
}
None => None,
None => Vec::new(),
};
let conf = Arc::new(SafeKeeperConf {
@@ -398,7 +398,7 @@ async fn main() -> anyhow::Result<()> {
ssl_key_file: args.ssl_key_file,
ssl_cert_file: args.ssl_cert_file,
ssl_cert_reload_period: args.ssl_cert_reload_period,
ssl_ca_cert,
ssl_ca_certs,
});
// initialize sentry if SENTRY_DSN is provided

View File

@@ -235,7 +235,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let resp = pull_timeline::handle_request(
data,
conf.sk_auth_token.clone(),
conf.ssl_ca_cert.clone(),
conf.ssl_ca_certs.clone(),
global_timelines,
)
.await

View File

@@ -120,7 +120,7 @@ pub struct SafeKeeperConf {
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
pub ssl_cert_reload_period: Duration,
pub ssl_ca_cert: Option<Certificate>,
pub ssl_ca_certs: Vec<Certificate>,
}
impl SafeKeeperConf {
@@ -169,7 +169,7 @@ impl SafeKeeperConf {
ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_cert: None,
ssl_ca_certs: Vec::new(),
}
}
}

View File

@@ -393,7 +393,7 @@ pub struct DebugDumpResponse {
pub async fn handle_request(
request: PullTimelineRequest,
sk_auth_token: Option<SecretString>,
ssl_ca_cert: Option<Certificate>,
ssl_ca_certs: Vec<Certificate>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<PullTimelineResponse> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
@@ -405,7 +405,7 @@ pub async fn handle_request(
}
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;

View File

@@ -183,7 +183,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
ssl_key_file: Utf8PathBuf::from(""),
ssl_cert_file: Utf8PathBuf::from(""),
ssl_cert_reload_period: Duration::ZERO,
ssl_ca_cert: None,
ssl_ca_certs: Vec::new(),
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -624,16 +624,19 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.config.compute_hook_url.clone()
};
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -8,7 +8,6 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
use pageserver_api::models::PageserverUtilization;
use reqwest::Certificate;
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use thiserror::Error;
@@ -27,8 +26,8 @@ struct HeartbeaterTask<Server, State> {
max_offline_interval: Duration,
max_warming_up_interval: Duration,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
}
#[derive(Debug, Clone)]
@@ -76,8 +75,8 @@ where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
pub(crate) fn new(
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -86,8 +85,8 @@ where
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
let mut heartbeater = HeartbeaterTask::new(
receiver,
http_client,
jwt_token,
ssl_ca_cert,
max_offline_interval,
max_warming_up_interval,
cancel,
@@ -122,8 +121,8 @@ where
{
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -134,8 +133,8 @@ where
state: HashMap::new(),
max_offline_interval,
max_warming_up_interval,
http_client,
jwt_token,
ssl_ca_cert,
}
}
async fn run(&mut self) {
@@ -178,7 +177,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, node) in &*pageservers {
heartbeat_futs.push({
let ssl_ca_cert = self.ssl_ca_cert.clone();
let http_client = self.http_client.clone();
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
@@ -193,8 +192,8 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let response = node_clone
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),
@@ -329,19 +328,19 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
continue;
}
heartbeat_futs.push({
let http_client = self.http_client.clone();
let jwt_token = self
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let ssl_ca_cert = self.ssl_ca_cert.clone();
let cancel = self.cancel.clone();
async move {
let response = sk
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),

View File

@@ -656,11 +656,10 @@ async fn handle_tenant_timeline_passthrough(
let _timer = latency.start_timer(labels.clone());
let client = mgmt_api::Client::new(
service.get_http_client().clone(),
node.base_url(),
service.get_config().pageserver_jwt_token.as_deref(),
service.get_config().ssl_ca_cert.clone(),
)
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
);
let resp = client.get_raw(path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.

View File

@@ -200,9 +200,14 @@ struct Cli {
/// Period to reload certificate and private key from files.
#[arg(long, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
ssl_cert_reload_period: humantime::Duration,
/// Trusted root CA certificate to use in https APIs.
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).
#[arg(long, default_value = "false")]
use_local_compute_notifications: bool,
}
enum StrictMode {
@@ -368,6 +373,9 @@ async fn async_main() -> anyhow::Result<()> {
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.use_local_compute_notifications => {
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
}
StrictMode::Strict => {
tracing::info!("Starting in strict mode: configuration is OK.")
}
@@ -376,13 +384,13 @@ async fn async_main() -> anyhow::Result<()> {
}
}
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
Some(ssl_ca_file) => {
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
let buf = tokio::fs::read(ssl_ca_file).await?;
Some(Certificate::from_pem(&buf)?)
Certificate::from_pem_bundle(&buf)?
}
None => None,
None => Vec::new(),
};
let config = Config {
@@ -425,8 +433,9 @@ async fn async_main() -> anyhow::Result<()> {
start_as_candidate: args.start_as_candidate,
use_https_pageserver_api: args.use_https_pageserver_api,
use_https_safekeeper_api: args.use_https_safekeeper_api,
ssl_ca_cert,
ssl_ca_certs,
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications,
};
// Validate that we can connect to the database

View File

@@ -7,7 +7,7 @@ use pageserver_api::controller_api::{
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use reqwest::{Certificate, StatusCode};
use reqwest::StatusCode;
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use utils::backoff;
@@ -280,8 +280,8 @@ impl Node {
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
http_client: &reqwest::Client,
jwt: &Option<String>,
ssl_ca_cert: &Option<Certificate>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
@@ -300,24 +300,13 @@ impl Node {
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
CreateClient(_) => true,
Timeout(_) => false,
}
}
// TODO: refactor PageserverClient and with_client_retires (#11113).
let mut http_client = reqwest::ClientBuilder::new().timeout(timeout);
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone())
}
let http_client = match http_client.build() {
Ok(http_client) => http_client,
Err(err) => return Some(Err(mgmt_api::Error::CreateClient(err))),
};
backoff::retry(
|| {
let client = PageserverClient::from_client(
let client = PageserverClient::new(
self.get_id(),
http_client.clone(),
self.base_url(),
@@ -326,11 +315,14 @@ impl Node {
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
let op_fut = tokio::time::timeout(timeout, op(client));
async {
tokio::select! {
r = op_fut=> {r},
r = op_fut => match r {
Ok(r) => r,
Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}

View File

@@ -8,7 +8,7 @@ use pageserver_api::models::{
use pageserver_api::shard::TenantShardId;
use pageserver_client::BlockUnblock;
use pageserver_client::mgmt_api::{Client, Result};
use reqwest::{Certificate, StatusCode};
use reqwest::StatusCode;
use utils::id::{NodeId, TenantId, TimelineId};
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
@@ -47,25 +47,13 @@ macro_rules! measured_request {
impl PageserverClient {
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<&str>,
ssl_ca_cert: Option<Certificate>,
) -> Result<Self> {
Ok(Self {
inner: Client::new(mgmt_api_endpoint, jwt, ssl_ca_cert)?,
node_id_label: node_id.0.to_string(),
})
}
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
inner: Client::new(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}

View File

@@ -997,10 +997,11 @@ impl Persistence {
// Clear sharding flag
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)
.await?;
debug_assert!(updated > 0);
assert!(updated == new_shard_count.count() as usize);
Ok(())
})

View File

@@ -86,6 +86,9 @@ pub(super) struct Reconciler {
/// Access to persistent storage for updating generation numbers
pub(crate) persistence: Arc<Persistence>,
/// HTTP client with proper CA certs.
pub(crate) http_client: reqwest::Client,
}
pub(crate) struct ReconcilerConfigBuilder {
@@ -298,8 +301,8 @@ impl Reconciler {
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
.await
},
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
3,
timeout,
@@ -419,10 +422,10 @@ impl Reconciler {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.service_config.pageserver_jwt_token.as_deref(),
self.service_config.ssl_ca_cert.clone(),
)?;
);
client
.wait_lsn(
@@ -443,10 +446,10 @@ impl Reconciler {
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.service_config.pageserver_jwt_token.as_deref(),
self.service_config.ssl_ca_cert.clone(),
)?;
);
let timelines = client.timeline_list(&tenant_shard_id).await?;
Ok(timelines
@@ -483,8 +486,8 @@ impl Reconciler {
)
.await
},
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
3,
request_download_timeout * 2,
@@ -778,8 +781,8 @@ impl Reconciler {
let observed_conf = match attached_node
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
1,
Duration::from_secs(5),
@@ -1127,8 +1130,8 @@ impl Reconciler {
match origin
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
3,
Duration::from_secs(5),

View File

@@ -1,7 +1,7 @@
use std::time::Duration;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use reqwest::{Certificate, StatusCode};
use reqwest::StatusCode;
use safekeeper_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::backoff;
@@ -94,8 +94,8 @@ impl Safekeeper {
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
http_client: &reqwest::Client,
jwt: &Option<SecretString>,
ssl_ca_cert: &Option<Certificate>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
@@ -114,17 +114,10 @@ impl Safekeeper {
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
CreateClient(_) => true,
Timeout(_) => false,
}
}
// TODO: refactor SafekeeperClient and with_client_retires (#11113).
let mut http_client = reqwest::Client::builder().timeout(timeout);
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?;
backoff::retry(
|| {
let client = SafekeeperClient::new(
@@ -136,11 +129,14 @@ impl Safekeeper {
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
let op_fut = tokio::time::timeout(timeout, op(client));
async {
tokio::select! {
r = op_fut=> {r},
r = op_fut => match r {
Ok(r) => r,
Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}

View File

@@ -267,7 +267,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
}
mgmt_api::Error::Cancelled => ApiError::ShuttingDown,
mgmt_api::Error::CreateClient(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
mgmt_api::Error::Timeout(e) => ApiError::Timeout(e.into()),
}
}
@@ -445,9 +445,11 @@ pub struct Config {
pub use_https_safekeeper_api: bool,
pub ssl_ca_cert: Option<Certificate>,
pub ssl_ca_certs: Vec<Certificate>,
pub timelines_onto_safekeepers: bool,
pub use_local_compute_notifications: bool,
}
impl From<DatabaseError> for ApiError {
@@ -524,6 +526,9 @@ pub struct Service {
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
/// passes, it isn't safe to do any actions that mutate tenants.
pub(crate) startup_complete: Barrier,
/// HTTP client with proper CA certs.
http_client: reqwest::Client,
}
impl From<ReconcileWaitError> for ApiError {
@@ -667,6 +672,10 @@ impl Service {
&self.config
}
pub fn get_http_client(&self) -> &reqwest::Client {
&self.http_client
}
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
/// view of the world, and determine which pageservers are responsive.
#[instrument(skip_all)]
@@ -965,8 +974,8 @@ impl Service {
let response = node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
5,
timeout,
@@ -1064,20 +1073,12 @@ impl Service {
break;
}
let client = match PageserverClient::new(
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
) {
Ok(client) => client,
Err(e) => {
tracing::error!(
"Failed to create client to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}"
);
continue;
}
};
);
match client
.location_config(
tenant_shard_id,
@@ -1655,17 +1656,36 @@ impl Service {
let cancel = CancellationToken::new();
let reconcilers_cancel = cancel.child_token();
let mut http_client = reqwest::Client::builder();
// We intentionally disable the connection pool, so every request will create its own TCP connection.
// It's especially important for heartbeaters to notice more network problems.
//
// TODO: It makes sense to use this client only in heartbeaters and create a second one with
// connection pooling for everything else. But reqwest::Client may create a connection without
// ever using it (it uses hyper's Client under the hood):
// https://github.com/hyperium/hyper-util/blob/d51318df3461d40e5f5e5ca163cb3905ac960209/src/client/legacy/client.rs#L415
//
// Because of a bug in hyper0::Connection::graceful_shutdown such connections hang during
// graceful server shutdown: https://github.com/hyperium/hyper/issues/2730
//
// The bug has been fixed in hyper v1, so keep alive may be enabled only after we migrate to hyper1.
http_client = http_client.pool_max_idle_per_host(0);
for ssl_ca_cert in &config.ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client.build()?;
let heartbeater_ps = Heartbeater::new(
http_client.clone(),
config.pageserver_jwt_token.clone(),
config.ssl_ca_cert.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
);
let heartbeater_sk = Heartbeater::new(
http_client.clone(),
config.safekeeper_jwt_token.clone(),
config.ssl_ca_cert.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
@@ -1708,6 +1728,7 @@ impl Service {
reconcilers_gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
http_client,
});
let result_task_this = this.clone();
@@ -2013,8 +2034,8 @@ impl Service {
let configs = match node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
5,
SHORT_RECONCILE_TIMEOUT,
@@ -2092,8 +2113,8 @@ impl Service {
.location_config(tenant_shard_id, config, None, false)
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
5,
SHORT_RECONCILE_TIMEOUT,
@@ -3235,11 +3256,10 @@ impl Service {
for tenant_shard_id in shard_ids {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
@@ -3298,11 +3318,10 @@ impl Service {
for (tenant_shard_id, node) in targets {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
futs.push(async move {
let result = client
.tenant_secondary_download(tenant_shard_id, wait)
@@ -3427,8 +3446,8 @@ impl Service {
.tenant_delete(TenantShardId::unsharded(tenant_id))
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
3,
RECONCILE_TIMEOUT,
@@ -3580,8 +3599,8 @@ impl Service {
async fn create_one(
tenant_shard_id: TenantShardId,
locations: ShardMutationLocations,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
let latest = locations.latest.node;
@@ -3594,8 +3613,7 @@ impl Service {
);
let client =
PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref(), ssl_ca_cert.clone())
.map_err(|e| passthrough_api_error(&latest, e))?;
PageserverClient::new(latest.get_id(), http_client.clone(), latest.base_url(), jwt.as_deref());
let timeline_info = client
.timeline_create(tenant_shard_id, &create_req)
@@ -3616,11 +3634,10 @@ impl Service {
let client = PageserverClient::new(
location.node.get_id(),
http_client.clone(),
location.node.base_url(),
jwt.as_deref(),
ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&location.node, e))?;
);
let res = client
.timeline_create(tenant_shard_id, &create_req)
@@ -3648,8 +3665,8 @@ impl Service {
let timeline_info = create_one(
shard_zero_tid,
shard_zero_locations,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
create_req.clone(),
)
.await?;
@@ -3678,8 +3695,8 @@ impl Service {
Box::pin(create_one(
tenant_shard_id,
mutation_locations,
self.http_client.clone(),
jwt.clone(),
self.config.ssl_ca_cert.clone(),
create_req,
))
},
@@ -3762,16 +3779,15 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
req: TimelineArchivalConfigRequest,
) -> Result<(), ApiError> {
tracing::info!(
"Setting archival config of timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
.map_err(|e| passthrough_api_error(&node, e))?;
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
client
.timeline_archival_config(tenant_shard_id, timeline_id, &req)
@@ -3793,8 +3809,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
req.clone(),
))
})
@@ -3831,16 +3847,15 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
behavior: Option<DetachBehavior>,
) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> {
tracing::info!(
"Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
.map_err(|e| passthrough_api_error(&node, e))?;
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
client
.timeline_detach_ancestor(tenant_shard_id, timeline_id, behavior)
@@ -3879,8 +3894,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
behavior,
))
})
@@ -3933,17 +3948,16 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
dir: BlockUnblock,
) -> Result<(), ApiError> {
let client = PageserverClient::new(
node.get_id(),
http_client,
node.base_url(),
jwt.as_deref(),
ssl_ca_cert,
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
client
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
@@ -3962,8 +3976,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
dir,
))
})
@@ -4091,8 +4105,8 @@ impl Service {
let r = node
.with_client_retries(
|client| op(tenant_shard_id, client),
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
warn_threshold,
max_retries,
timeout,
@@ -4316,15 +4330,14 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
) -> Result<StatusCode, ApiError> {
tracing::info!(
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
.map_err(|e| passthrough_api_error(&node, e))?;
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
let res = client
.timeline_delete(tenant_shard_id, timeline_id)
.await;
@@ -4350,8 +4363,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
))
})
.await?;
@@ -4373,8 +4386,8 @@ impl Service {
shard_zero_tid,
timeline_id,
shard_zero_locations.latest.node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
)
.await?;
Ok(shard_zero_status)
@@ -4809,8 +4822,8 @@ impl Service {
client.location_config(child_id, config, None, false).await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
10,
Duration::from_secs(5),
@@ -5412,11 +5425,10 @@ impl Service {
} = target;
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(node, e))?;
);
let response = client
.tenant_shard_split(
*parent_id,
@@ -5456,6 +5468,8 @@ impl Service {
}
}
pausable_failpoint!("shard-split-pre-complete");
// TODO: if the pageserver restarted concurrently with our split API call,
// the actual generation of the child shard might differ from the generation
// we expect it to have. In order for our in-database generation to end up
@@ -5898,11 +5912,10 @@ impl Service {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
let scan_result = client
.tenant_scan_remote_storage(tenant_id)
@@ -7136,6 +7149,7 @@ impl Service {
units,
gate_guard,
&self.reconcilers_cancel,
self.http_client.clone(),
)
}
@@ -7543,8 +7557,8 @@ impl Service {
match attached_node
.with_client_retries(
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
3,
10,
SHORT_RECONCILE_TIMEOUT,
@@ -7580,8 +7594,8 @@ impl Service {
)
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
3,
10,
SHORT_RECONCILE_TIMEOUT,
@@ -7854,8 +7868,8 @@ impl Service {
futures.push(async move {
node.with_client_retries(
|client| async move { client.top_tenant_shards(request.clone()).await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
3,
3,
Duration::from_secs(5),
@@ -7974,8 +7988,8 @@ impl Service {
match node
.with_client_retries(
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
3,
Duration::from_millis(250),

View File

@@ -338,7 +338,6 @@ impl SafekeeperReconciler {
.safekeeper_jwt_token
.clone()
.map(SecretString::from);
let ssl_ca_cert = self.service.config.ssl_ca_cert.clone();
loop {
let res = req
.safekeeper
@@ -347,8 +346,8 @@ impl SafekeeperReconciler {
let closure = &closure;
async move { closure(client).await }
},
self.service.get_http_client(),
&jwt,
&ssl_ca_cert,
3,
10,
Duration::from_secs(10),

View File

@@ -78,8 +78,8 @@ impl Service {
for sk in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk as u64);
let safekeepers = safekeepers.clone();
let http_client = self.http_client.clone();
let jwt = jwt.clone();
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
let req = req.clone();
joinset.spawn(async move {
// Unwrap is fine as we already would have returned error above
@@ -90,8 +90,8 @@ impl Service {
let req = req.clone();
async move { client.create_timeline(&req).await }
},
&http_client,
&jwt,
&ssl_ca_cert,
3,
3,
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,

View File

@@ -1588,6 +1588,7 @@ impl TenantShard {
units: ReconcileUnits,
gate_guard: GateGuard,
cancel: &CancellationToken,
http_client: reqwest::Client,
) -> Option<ReconcilerWaiter> {
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
@@ -1633,6 +1634,7 @@ impl TenantShard {
cancel: reconciler_cancel.clone(),
persistence: persistence.clone(),
compute_notify_failure: false,
http_client,
};
let reconcile_seq = self.sequence;

View File

@@ -1169,6 +1169,12 @@ class NeonEnv:
if storage_controller_config is not None:
cfg["storage_controller"] = storage_controller_config
if config.test_may_use_compatibility_snapshot_binaries:
if "storage_controller" in cfg:
cfg["storage_controller"]["use_local_compute_notifications"] = False
else:
cfg["storage_controller"] = {"use_local_compute_notifications": False}
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
@@ -1725,6 +1731,8 @@ class LogUtils:
log.warning(f"Skipping log check: {logfile} does not exist")
return None
log.info(f"Checking log {logfile} for pattern '{pattern}'")
contains_re = re.compile(pattern)
# XXX: Our rust logging machinery buffers the messages, so if you
@@ -2618,10 +2626,13 @@ class NeonProxiedStorageController(NeonStorageController):
self.running = False
return self
def instance_log_path(self, instance_id: int) -> Path:
return self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log"
def assert_no_errors(self):
for instance_id in self.instances.keys():
assert_no_errors(
self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log",
self.instance_log_path(instance_id),
"storage_controller",
self.allowed_errors,
)
@@ -2629,7 +2640,14 @@ class NeonProxiedStorageController(NeonStorageController):
def log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> tuple[str, LogCursor] | None:
raise NotImplementedError()
for instance_id in self.instances.keys():
log_path = self.instance_log_path(instance_id)
checker = LogUtils(log_path)
found = checker.log_contains(pattern, offset)
if found is not None:
return found
return None
@dataclass

View File

@@ -82,6 +82,7 @@ def test_storage_controller_many_tenants(
# guard against regressions in restart time.
"max_offline": "30s",
"max_warming_up": "300s",
"use_local_compute_notifications": False,
}
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api

View File

@@ -5,11 +5,9 @@ import asyncio
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
"""
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
is all done via the storage controller, but this test will disable the storage controller's compute
@@ -23,19 +21,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
)
env = neon_env_builder.init_start()
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
def ignore_notify(request: Request):
# This test does direct updates to compute configuration: disable the storage controller's notification
log.info(f"Ignoring storage controller compute notification: {request.json}")
return Response(status=200)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
ignore_notify
)
env.create_branch("test_change_pageserver")
endpoint = env.endpoints.create_start("test_change_pageserver")

View File

@@ -12,6 +12,7 @@ import fixtures.utils
import pytest
import toml
from fixtures.common_types import TenantId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -592,17 +593,22 @@ def test_historic_storage_formats(
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize(
**fixtures.utils.allpairs_versions(),
)
def test_versions_mismatch(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Checks compatibility of different combinations of versions of the components
"""
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",

View File

@@ -91,6 +91,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
def ignore_notify(request: Request):
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.

View File

@@ -808,6 +808,8 @@ def test_sharding_split_stripe_size(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
env = neon_env_builder.init_start(
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
)
@@ -1316,6 +1318,11 @@ def test_sharding_split_failures(
initial_shard_count = 2
split_shard_count = 4
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()

View File

@@ -73,7 +73,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
def test_storage_controller_smoke(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
):
"""
Test the basic lifecycle of a storage controller:
- Restarting
@@ -83,6 +85,7 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
# Start services by hand so that we can skip a pageserver (this will start + register later)
@@ -620,6 +623,8 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -738,6 +743,8 @@ def test_storage_controller_stuck_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -885,6 +892,8 @@ def test_storage_controller_compute_hook_retry(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_configs()
env.start()
@@ -1008,6 +1017,8 @@ def test_storage_controller_compute_hook_revert(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
tenant_id = env.initial_tenant
@@ -1398,6 +1409,11 @@ def test_storage_controller_tenant_deletion(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()
@@ -1599,6 +1615,12 @@ def test_storage_controller_heartbeats(
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.*failpoint.*"
)
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.* Timeout.*"
)
# Initially we have two online pageservers
nodes = env.storage_controller.node_list()
@@ -2170,7 +2192,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize("num_azs", [1, 2])
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
def test_graceful_cluster_restart(
neon_env_builder: NeonEnvBuilder,
num_azs: int,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Graceful reststart of storage controller clusters use the drain and
fill hooks in order to migrate attachments away from pageservers before
@@ -2182,6 +2209,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int
"""
neon_env_builder.num_azs = num_azs
neon_env_builder.num_pageservers = 2
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
env.start()
@@ -2437,7 +2465,6 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("while_offline", [True, False])
def test_storage_controller_node_deletion(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
while_offline: bool,
):
"""
@@ -2863,6 +2890,143 @@ def test_storage_controller_leadership_transfer(
)
def test_storage_controller_leadership_transfer_during_split(
neon_env_builder: NeonEnvBuilder,
storage_controller_proxy: StorageControllerProxy,
port_distributor: PortDistributor,
):
"""
Exercise a race between shard splitting and graceful leadership transfer. This is
a reproducer for https://github.com/neondatabase/neon/issues/11254
"""
neon_env_builder.auth_enabled = True
neon_env_builder.num_pageservers = 3
neon_env_builder.storage_controller_config = {
"database_url": f"127.0.0.1:{port_distributor.get_port()}",
"start_as_candidate": True,
}
neon_env_builder.storage_controller_port_override = storage_controller_proxy.port()
storage_controller_1_port = port_distributor.get_port()
storage_controller_2_port = port_distributor.get_port()
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env = neon_env_builder.init_configs()
start_env(env, storage_controller_1_port)
assert (
env.storage_controller.get_leadership_status() == StorageControllerLeadershipStatus.LEADER
)
leader = env.storage_controller.get_leader()
assert leader["address"] == f"http://127.0.0.1:{storage_controller_1_port}/"
tenant_count = 2
shard_count = 4
tenants = set(TenantId.generate() for _ in range(0, tenant_count))
for tid in tenants:
env.storage_controller.tenant_create(
tid, shard_count=shard_count, placement_policy={"Attached": 1}
)
env.storage_controller.reconcile_until_idle()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# Start a shard split
env.storage_controller.allowed_errors.extend(
[".*Unexpected child shard count.*", ".*Enqueuing background abort.*"]
)
pause_failpoint = "shard-split-pre-complete"
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
split_fut = executor.submit(
env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2
)
def hit_failpoint():
log.info("Checking log for pattern...")
try:
assert env.storage_controller.log_contains(f".*at failpoint {pause_failpoint}.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
wait_until(hit_failpoint, interval=0.1, status_interval=1.0)
env.storage_controller.start(
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
)
def passed_split_abort():
try:
log.info("Checking log for pattern...")
assert env.storage_controller.log_contains(
".*Using observed state received from leader.*"
)
except Exception:
log.exception("Failed to find pattern in log")
raise
log.info("Awaiting split abort")
wait_until(passed_split_abort, interval=0.1, status_interval=1.0)
assert env.storage_controller.log_contains(".*Aborting shard split.*")
# Proxy is still talking to original controller here: disable its pause failpoint so
# that its shard split can run to completion.
log.info("Disabling failpoint")
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
# on handling the shard split request.
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
def previous_stepped_down():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.STEPPED_DOWN
)
log.info("Awaiting step down")
wait_until(previous_stepped_down)
# Let the shard split complete: this may happen _after_ the replacement has come up
# and tried to clean up the databases
log.info("Unblocking & awaiting shard split")
with pytest.raises(Exception, match="Unexpected child shard count"):
# This split fails when it tries to persist results, because it encounters
# changes already made by the new controller's abort-on-startup
split_fut.result()
log.info("Routing to new leader")
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
def new_becomes_leader():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.LEADER
)
wait_until(new_becomes_leader)
leader = env.storage_controller.get_leader()
assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/"
env.storage_controller.wait_until_ready()
env.storage_controller.consistency_check()
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
# single unsharded tenant, two locations
neon_env_builder.num_pageservers = 2