mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-30 16:50:37 +00:00
Compare commits
8 Commits
release-co
...
cloneable/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a7946dffec | ||
|
|
3e5884ff01 | ||
|
|
9fc7c22cc9 | ||
|
|
23ad228310 | ||
|
|
aeb53fea94 | ||
|
|
0f367cb665 | ||
|
|
76088c16d2 | ||
|
|
0d99609870 |
11
Dockerfile
11
Dockerfile
@@ -103,12 +103,17 @@ RUN set -e \
|
||||
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
|
||||
&& apt update \
|
||||
&& apt install -y \
|
||||
bpftrace \
|
||||
ca-certificates \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
ca-certificates \
|
||||
# System postgres for use with client libraries (e.g. in storage controller)
|
||||
postgresql-15 \
|
||||
iproute2 \
|
||||
lsof \
|
||||
openssl \
|
||||
# System postgres for use with client libraries (e.g. in storage controller)
|
||||
postgresql-15 \
|
||||
screen \
|
||||
tcpdump \
|
||||
&& rm -f /etc/apt/apt.conf.d/80-retries \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
|
||||
&& useradd -d /data neon \
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
@@ -153,11 +153,6 @@ pub struct ComputeState {
|
||||
pub startup_span: Option<tracing::span::Span>,
|
||||
|
||||
pub metrics: ComputeMetrics,
|
||||
|
||||
/// current audit log level
|
||||
/// to know if it is already configured, or we need to set up audit
|
||||
/// when compute receives a new spec
|
||||
pub audit_log_level: ComputeAudit,
|
||||
}
|
||||
|
||||
impl ComputeState {
|
||||
@@ -170,7 +165,6 @@ impl ComputeState {
|
||||
pspec: None,
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
audit_log_level: ComputeAudit::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -626,10 +620,16 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// If extended compute audit is enabled configure and start rsyslog
|
||||
if pspec.spec.audit_log_level == ComputeAudit::Hipaa {
|
||||
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(&log_directory_path, pspec.spec.audit_log_level.as_str())?;
|
||||
// Configure and start rsyslog for HIPAA if necessary
|
||||
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
|
||||
if remote_endpoint.is_empty() {
|
||||
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
|
||||
}
|
||||
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
let log_directory_path = log_directory_path.to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
@@ -684,11 +684,6 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// after all the configuration is done
|
||||
// preserve the information about the current audit log level
|
||||
// so that we don't relaunch rsyslog on every spec change
|
||||
self.set_audit_log_level(pspec.spec.audit_log_level);
|
||||
|
||||
// All done!
|
||||
let startup_end_time = Utc::now();
|
||||
let metrics = {
|
||||
@@ -843,19 +838,6 @@ impl ComputeNode {
|
||||
self.state.lock().unwrap().status
|
||||
}
|
||||
|
||||
pub fn set_audit_log_level(&self, audit_log_level: ComputeAudit) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.audit_log_level = audit_log_level;
|
||||
}
|
||||
|
||||
pub fn get_audit_log_level(&self) -> ComputeAudit {
|
||||
self.state.lock().unwrap().audit_log_level
|
||||
}
|
||||
|
||||
pub fn get_audit_log_dir(&self) -> PathBuf {
|
||||
Path::new(&self.params.pgdata).join("log")
|
||||
}
|
||||
|
||||
pub fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.state
|
||||
.lock()
|
||||
@@ -1566,29 +1548,6 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// If extended compute audit is enabled configure and start rsyslog
|
||||
// We check that the audit_log_level changed compared to the previous spec and skip this step if not.
|
||||
let audit_log_level = self.get_audit_log_level();
|
||||
|
||||
if spec.audit_log_level == ComputeAudit::Hipaa && audit_log_level != spec.audit_log_level {
|
||||
info!(
|
||||
"Configuring audit logging because audit_log_level changed from {:?} to {:?}",
|
||||
audit_log_level, spec.audit_log_level
|
||||
);
|
||||
|
||||
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(&log_directory_path, spec.audit_log_level.as_str())?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
// If rsyslog was already configured, we don't need to start this process again.
|
||||
match audit_log_level {
|
||||
ComputeAudit::Disabled | ComputeAudit::Log => {
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
config::write_postgres_conf(
|
||||
@@ -1598,14 +1557,7 @@ impl ComputeNode {
|
||||
&self.compute_ctl_config.tls,
|
||||
)?;
|
||||
|
||||
// Override the skip_catalog_updates flag
|
||||
// if we need to install new extensions
|
||||
//
|
||||
// Check that audit_log_level changed compared to the previous spec and skip this step if not.
|
||||
// All operations are idempotent, so this is just a performance optimization.
|
||||
let force_catalog_updates = audit_log_level != spec.audit_log_level;
|
||||
|
||||
if !spec.skip_pg_catalog_updates || force_catalog_updates {
|
||||
if !spec.skip_pg_catalog_updates {
|
||||
let max_concurrent_connections = spec.reconfigure_concurrency;
|
||||
// Temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
@@ -1630,11 +1582,6 @@ impl ComputeNode {
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
// after all the configuration is done
|
||||
// preserve the information about the current audit log level
|
||||
// so that we don't relaunch rsyslog on every spec change
|
||||
self.set_audit_log_level(spec.audit_log_level);
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
|
||||
info!(
|
||||
|
||||
@@ -200,9 +200,8 @@ pub fn write_postgres_conf(
|
||||
)?;
|
||||
// This log level is very verbose
|
||||
// but this is necessary for HIPAA compliance.
|
||||
// Exclude 'misc' category, because it doesn't contain anything relevant.
|
||||
// Exclude 'misc' category, because it doesn't contain anythig relevant.
|
||||
writeln!(file, "pgaudit.log='all, -misc'")?;
|
||||
// Log parameters for all queries
|
||||
writeln!(file, "pgaudit.log_parameter=on")?;
|
||||
// Disable logging of catalog queries
|
||||
// The catalog doesn't contain sensitive data, so we don't need to audit it.
|
||||
|
||||
@@ -9,7 +9,6 @@ use anyhow::{Context, Result, anyhow};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
|
||||
const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
|
||||
|
||||
fn get_rsyslog_pid() -> Option<String> {
|
||||
let output = Command::new("pgrep")
|
||||
@@ -49,43 +48,32 @@ fn restart_rsyslog() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?;
|
||||
if remote_endpoint.is_empty() {
|
||||
return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set"));
|
||||
}
|
||||
|
||||
let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) {
|
||||
Ok(c) => c,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
log_directory: String,
|
||||
tag: &str,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
tag = audit_log_level,
|
||||
tag = tag,
|
||||
remote_endpoint = remote_endpoint
|
||||
);
|
||||
|
||||
if old_config_content == config_content {
|
||||
info!("rsyslog configuration is up-to-date");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("rsyslog config_content: {}", config_content);
|
||||
|
||||
let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(AUDIT_LOGS_CONF_PATH)?;
|
||||
.open(rsyslog_conf_path)?;
|
||||
|
||||
file.write_all(config_content.as_bytes())?;
|
||||
|
||||
info!(
|
||||
"rsyslog configuration file {} added successfully. Starting rsyslogd",
|
||||
AUDIT_LOGS_CONF_PATH
|
||||
rsyslog_conf_path
|
||||
);
|
||||
|
||||
// start the service, using the configuration
|
||||
|
||||
@@ -184,6 +184,8 @@ pub struct NeonStorageControllerConf {
|
||||
pub timelines_onto_safekeepers: bool,
|
||||
|
||||
pub use_https_safekeeper_api: bool,
|
||||
|
||||
pub use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -213,6 +215,7 @@ impl Default for NeonStorageControllerConf {
|
||||
use_https_pageserver_api: false,
|
||||
timelines_onto_safekeepers: false,
|
||||
use_https_safekeeper_api: false,
|
||||
use_local_compute_notifications: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,11 +51,19 @@ impl PageServerNode {
|
||||
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
let port = port.unwrap_or(5432);
|
||||
|
||||
let ssl_ca_cert = env.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
|
||||
Certificate::from_pem(&buf).expect("CA certificate should be valid")
|
||||
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
|
||||
});
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client
|
||||
.build()
|
||||
.expect("Client constructs with no errors");
|
||||
|
||||
let endpoint = if env.storage_controller.use_https_pageserver_api {
|
||||
format!(
|
||||
"https://{}",
|
||||
@@ -72,6 +80,7 @@ impl PageServerNode {
|
||||
conf: conf.clone(),
|
||||
env: env.clone(),
|
||||
http_client: mgmt_api::Client::new(
|
||||
http_client,
|
||||
endpoint,
|
||||
{
|
||||
match conf.http_auth_type {
|
||||
@@ -83,9 +92,7 @@ impl PageServerNode {
|
||||
}
|
||||
}
|
||||
.as_deref(),
|
||||
ssl_ca_cert,
|
||||
)
|
||||
.expect("Client constructs with no errors"),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, Timelin
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use postgres_backend::AuthType;
|
||||
use reqwest::Method;
|
||||
use reqwest::{Certificate, Method};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::process::Command;
|
||||
@@ -143,11 +143,14 @@ impl StorageController {
|
||||
}
|
||||
};
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
if let Some(ssl_ca_file) = env.ssl_ca_cert_path() {
|
||||
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
|
||||
let cert = reqwest::Certificate::from_pem(&buf).expect("SSL CA file should be valid");
|
||||
http_client = http_client.add_root_certificate(cert);
|
||||
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
|
||||
});
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client
|
||||
.build()
|
||||
@@ -552,6 +555,10 @@ impl StorageController {
|
||||
args.push("--use-https-safekeeper-api".to_string());
|
||||
}
|
||||
|
||||
if self.config.use_local_compute_notifications {
|
||||
args.push("--use-local-compute-notifications".to_string());
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
|
||||
use pageserver_client::mgmt_api::{self};
|
||||
use reqwest::{Method, StatusCode, Url};
|
||||
use reqwest::{Certificate, Method, StatusCode, Url};
|
||||
use storage_controller_client::control_api::Client;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -274,7 +274,7 @@ struct Cli {
|
||||
jwt: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// Trusted root CA certificate to use in https APIs.
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
ssl_ca_file: Option<PathBuf>,
|
||||
|
||||
#[command(subcommand)]
|
||||
@@ -387,17 +387,23 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let ssl_ca_cert = match &cli.ssl_ca_file {
|
||||
let ssl_ca_certs = match &cli.ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Some(reqwest::Certificate::from_pem(&buf)?)
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => None,
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for ssl_ca_cert in ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let mut trimmed = cli.api.to_string();
|
||||
trimmed.pop();
|
||||
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?;
|
||||
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
|
||||
|
||||
match cli.command {
|
||||
Command::NodeRegister {
|
||||
|
||||
196
docs/rfcs/2025-02-14-storage-controller.md
Normal file
196
docs/rfcs/2025-02-14-storage-controller.md
Normal file
@@ -0,0 +1,196 @@
|
||||
|
||||
## Summary
|
||||
|
||||
This is a retrospective RFC to document the design of the `storage-controller` service.
|
||||
|
||||
This service manages the physical mapping of Tenants and Timelines to Pageservers and Safekeepers. It
|
||||
acts as the API for "storage" as an abstract concept: enabling other parts of the system to reason
|
||||
about things like creating/deleting tenants and timelines without having to understand exactly which
|
||||
pageserver and safekeeper to communicate, or any subtle rules about how to orchestrate these things.
|
||||
|
||||
The storage controller was implemented in the first half of 2024 as an essential part
|
||||
of storage sharding, especially [shard splitting](032-shard-splitting.md).
|
||||
|
||||
It initially managed only pageservers, but has extended in 2025 to also manage safekeepers. In
|
||||
some places you may seen unqualified references to 'nodes' -- those are pageservers.
|
||||
|
||||
## Design Choices
|
||||
|
||||
### Durability
|
||||
|
||||
We rely on an external postgres for all durable state. No local storage is used.
|
||||
|
||||
We avoid any unnecessary I/O to durable storage. For example:
|
||||
- most tracking of in-flight changes to the system is done in-memory rather than recording progress/steps in a database
|
||||
- When migrating tenant shards between pageservers we only touch the database to increment generation numbers,
|
||||
we do not persist the total state of a tenant shard.
|
||||
|
||||
Being frugal with database I/O has two benefits:
|
||||
- It avoids the database becoming a practical scaling bottleneck (we expect in-memory scale issues to be hit
|
||||
before we hit e.g. transactions-per-second issues)
|
||||
- It reduces cost when using a cloud database service to run the controller's postgres database.
|
||||
|
||||
The trade-off is that there is a "bootstrapping" problem: a controller can't be deployed in isolation, one
|
||||
must first have some existing database system. In practice, we expect that Neon is deployed in one of the
|
||||
following ways:
|
||||
- into a cloud which has a postgres service that can be used to run the controller
|
||||
- into a mature on-prem environment that has existing facilities for running databases
|
||||
- into a test/dev environment where a simple one-node vanilla postgres installation is sufficient
|
||||
|
||||
### Consensus
|
||||
|
||||
The controller does _not_ implement any strong consensus mechanism of its own. Instead:
|
||||
- Where strong consistency is required (for example, for pageserver generation numbers), this
|
||||
responsibility is delegated to a transaction in our postgres database.
|
||||
- Highly available deploys are done using a simple in-database record of what controller instances
|
||||
are available, distinguished by timestamps, rather than having controllers directly negotiate a leader.
|
||||
|
||||
Avoiding strong consensus among controller processes is a cost saving (we avoid running three controllers
|
||||
all the time), and simplifies implementation (we do not have to phrase all configuration changes as e.g raft
|
||||
transactions).
|
||||
|
||||
The trade-off is that under some circumstances a controller with partial network isolation can cause availability
|
||||
issues in the cluster, by making changes to pageserver state that might disagree with what the "true" active
|
||||
controller is trying to do. The impact of this is bounded by our `controllers` database table, that enables
|
||||
a rogue node to eventually realise that it is not the leader and step down. If a rogue node can't reach
|
||||
the database, then it implicitly stops making progress. A rogue controller cannot durably damage the system
|
||||
because pageserver data and safekeeper configs are protected by generation numbers that are only updated
|
||||
via postgres transactions (i.e. no controller "trusts itself" to independently make decisions about generations).
|
||||
|
||||
### Scale
|
||||
|
||||
We design for high but not unlimited scale. The memory footprint of each tenant shard is small (~8kB), so
|
||||
it is realistic to scale up to a million attached shards on a server with modest resources. Tenants in
|
||||
a detached state (i.e. not active on pageservers) do not need to be managed by storage controller, and can
|
||||
be relegated from memory to the database.
|
||||
|
||||
Typically, a tenant shard is updated about once a week, when we do a deploy. During deploys, we relocate
|
||||
a few thousand tenants from each pageserver while it is restarted, so it is extremely rare for the controller
|
||||
to have to do O(N) work (on all shards at once).
|
||||
|
||||
There are places where we do O(N) work:
|
||||
- On normal startup, when loading from the database into memory
|
||||
- On unclean startup (with no handover of observed state from a previous controller), where we will
|
||||
scan all shards on all pageservers.
|
||||
|
||||
It is important that these locations are written efficiently. At high scale we should still expect runtimes
|
||||
of the order tens of seconds to complete a storage controller start.
|
||||
|
||||
When the practical scale limit of a single storage controller is reached, just deploy another one with its
|
||||
own pageservers & safekeepers: each controller+its storage servers should be thought of as a logical cluster
|
||||
or "cell" of storage.
|
||||
|
||||
# High Level Design
|
||||
|
||||
The storage controller is an in-memory system (i.e. state for all attached
|
||||
tenants is held in memory _as well as_ being represented in durable postgres storage).
|
||||
|
||||
## Infrastructure
|
||||
|
||||
The storage controller is an async rust binary using tokio.
|
||||
|
||||
The storage controller is built around the `Service` type. This implements
|
||||
all the entry points for the outside world's interaction with the controller (HTTP handlers are mostly thin wrappers of service functions),
|
||||
and holds most in-memory state (e.g. the list of tenant shards).
|
||||
|
||||
The state is held in a `ServiceInner` wrapped in a RwLock. This monolithic
|
||||
lock is used to simplify reasoning about code that mutates state: each function that takes a write lock may be thought of as a serializable transaction on the in-memory state. This lock is clearly a bottleneck, but
|
||||
nevertheless is scalable to managing millions of tenants.
|
||||
|
||||
Persistent state is held in a postgres database, and we use the `diesel` crate to provide database client functionality. All database access is wrapped in the `Persistence` type -- this makes it easy to understand which
|
||||
code is touching the database. The database is only used when necessary, i.e. for state that cannot be recovered another way. For example, we do not store the secondary pageserver locations of tenant shards in the database, rather we learn these at startup from running pageservers, and/or make scheduling decisions to fill in the gaps. This adds some complexity, but massively reduces the load on the database, and enables running the storage controller with a very cheap postgres instance.
|
||||
|
||||
## Pageserver tenant scheduling & reconciliation
|
||||
|
||||
### Intent & observed state
|
||||
|
||||
Each tenant shard is represented by type `TenantShard`, which has an 'intent' and 'observed' state. Setting the
|
||||
intent state is called _scheduling_, and doing remote I/O to make observed
|
||||
state match intent state is called _reconciliation_.
|
||||
|
||||
The `Scheduler` type is responsible for making choices about the intent
|
||||
state, such as choosing a pageserver for a new tenant shard, or assigning
|
||||
a replacement pageserver when the original one fails.
|
||||
|
||||
The observed state is updated after tenant reconciliation (see below), and
|
||||
has the concept of a `None` state for a pageserver, indicating unknown state. This is used to ensure that we can safely clean up after we start
|
||||
but do not finish a remote call to a pageserver, or if a pageserver restarts and we are uncertain of its state.
|
||||
|
||||
### Tenant Reconciliation
|
||||
|
||||
The `Reconciler` type is responsible for updating pageservers to achieve
|
||||
the intent state. It is instantiated when `Service` determines that a shard requires reconciliation, and owned by a background tokio task that
|
||||
runs it to completion. Reconciler does not have access to the `Service` state: it is populated with a snapshot of relevant information when constructed, and submits is results to a channel that `Service` consumes
|
||||
to update the tenant shard's observed state.
|
||||
|
||||
The Reconciler does have access to the database, but only uses it for
|
||||
a single purpose: updating shards' generation numbers immediately before
|
||||
attaching them to a pageserver.
|
||||
|
||||
Operations that change a tenant's scheduling will spawn a reconciler if
|
||||
necessary, and there is also a background loop which checks every shard
|
||||
for the need to reconcile -- this background loop ensures eventual progress
|
||||
if some earlier reconciliations failed for some reason.
|
||||
|
||||
The reconciler has a general purpose code path which will attach/detach from pageservers as necessary, and a special case path for live migrations. The live migration case is more common in practice, and is taken whenever the current observed state indicates that we have a healthy attached location to migrate from. This implements live migration as described in the earlier [live migration RFC](028-pageserver-migration.md).
|
||||
|
||||
### Scheduling optimisation
|
||||
|
||||
During the periodic background reconciliation loop, the controller also
|
||||
performance _scheduling optimization_. This is the process of looking for
|
||||
shards that are in sub-optimal locations, and moving them.
|
||||
|
||||
Typically, this means:
|
||||
- Shards attached outside their preferred AZ (e.g. after a node failure), to migrate them back to their preferred AZ
|
||||
- Shards attached on the same pageserver as some other shards in the same
|
||||
tenant, to migrate them elsewhere (e.g. after a shard split)
|
||||
|
||||
Scheduling optimisation is a multi-step process to ensure graceful cutovers, e.g. by creating new secondary location, waiting for it to
|
||||
warm up, then cutting over. This is not done as an explicit queue
|
||||
of operations, but rather by iteratively calling the optimisation
|
||||
function, which will recognise each intervening state as something
|
||||
that can generate the next optimisation.
|
||||
|
||||
### Pageserver heartbeats and failure
|
||||
|
||||
The `Heartbeater` type is responsible for detecting when a pageserver
|
||||
becomes unavailable. This is fed back into `Service` for action: when
|
||||
a pageserver is marked unavailable, tenant shards on that pageserver are
|
||||
rescheduled and Reconcilers are spawned to cut them over to their new location.
|
||||
|
||||
## Pageserver timeline CRUD operations
|
||||
|
||||
By CRUD operations, we mean creating and deleting timelines. The authoritative storage for which timelines exist on the pageserver
|
||||
is in S3, and is governed by the pageserver's system of generation
|
||||
numbers. Because a shard can be attached to multiple pageservers
|
||||
concurrently, we need to handle this when doing timeline CRUD operations:
|
||||
- A timeline operation is only persistent if _after_ the ack from a pageserver, that pageserver's generation is still the latest.
|
||||
- For deletions in particular, they are only persistent if _all_ attached
|
||||
locations have acked the deletion operation, since if only the latest one
|
||||
has acked then the timeline could still return from the dead if some old-generation attachment writes an index for it.
|
||||
|
||||
## Zero-downtime controller deployments
|
||||
|
||||
When two storage controllers run at the same time, they coordinate via
|
||||
the database to establish one leader, and the other controller may proxy
|
||||
requests to this leader
|
||||
|
||||
See [Storage controller restarts RFC](037-storage-controller-restarts.md).
|
||||
|
||||
Note that this is not a strong consensus mechanism: the controller must also survive split-brain situations. This is respected by code that
|
||||
e.g. increments version numbers, which uses database transactions that
|
||||
check the expected value before modifying it. A split-brain situation can
|
||||
impact availability (e.g. if two controllers are fighting over where to
|
||||
attach a shard), but it should never impact durability and data integrity.
|
||||
|
||||
## Graceful drain & fill of pageservers during deploys
|
||||
|
||||
The storage controller has functionality for draining + filling pageservers
|
||||
while deploying new pageserver binaries, so that clients are not actively
|
||||
using a pageserver while it restarts.
|
||||
|
||||
See [Graceful restarts RFC](033-storage-controller-drain-and-fill.md)
|
||||
|
||||
## Safekeeper timeline scheduling
|
||||
|
||||
This is currently under development, see [Safekeeper dynamic membership change RFC](035-safekeeper-dynamic-membership-change.md).
|
||||
@@ -160,6 +160,12 @@ pub struct ComputeSpec {
|
||||
pub drop_subscriptions_before_start: bool,
|
||||
|
||||
/// Log level for audit logging:
|
||||
///
|
||||
/// Disabled - no audit logging. This is the default.
|
||||
/// log - log masked statements to the postgres log using pgaudit extension
|
||||
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
|
||||
///
|
||||
/// Extensions should be present in shared_preload_libraries
|
||||
#[serde(default)]
|
||||
pub audit_log_level: ComputeAudit,
|
||||
}
|
||||
@@ -282,27 +288,16 @@ impl ComputeMode {
|
||||
}
|
||||
|
||||
/// Log level for audit logging
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
/// Disabled, log, hipaa
|
||||
/// Default is Disabled
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub enum ComputeAudit {
|
||||
#[default]
|
||||
/// no audit logging. This is the default.
|
||||
Disabled,
|
||||
/// write masked audit log statements to the postgres log using pgaudit extension
|
||||
Log,
|
||||
/// log unmasked statements to the file using pgaudit and pgauditlogtofile extensions
|
||||
Hipaa,
|
||||
}
|
||||
|
||||
impl ComputeAudit {
|
||||
pub fn as_str(&self) -> &str {
|
||||
match self {
|
||||
ComputeAudit::Disabled => "disabled",
|
||||
ComputeAudit::Log => "log",
|
||||
ComputeAudit::Hipaa => "hipaa",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Cluster {
|
||||
pub cluster_id: Option<String>,
|
||||
|
||||
@@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody;
|
||||
use pageserver_api::models::*;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
pub use reqwest::Body as ReqwestBody;
|
||||
use reqwest::{Certificate, IntoUrl, Method, StatusCode, Url};
|
||||
use reqwest::{IntoUrl, Method, StatusCode, Url};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -39,8 +39,8 @@ pub enum Error {
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
|
||||
CreateClient(reqwest::Error),
|
||||
#[error("request timed out: {0}")]
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -72,24 +72,7 @@ pub enum ForceAwaitLogicalSize {
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<&str>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
) -> Result<Self> {
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
if let Some(ssl_ca_cert) = ssl_ca_cert {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client.build().map_err(Error::CreateClient)?;
|
||||
Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt))
|
||||
}
|
||||
|
||||
pub fn from_client(
|
||||
client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<&str>,
|
||||
) -> Self {
|
||||
pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
|
||||
Self {
|
||||
mgmt_api_endpoint,
|
||||
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
|
||||
|
||||
@@ -34,10 +34,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
|
||||
@@ -75,10 +75,10 @@ async fn main_impl(
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
|
||||
@@ -123,10 +123,10 @@ async fn main_impl(
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
));
|
||||
|
||||
if let Some(engine_str) = &args.set_io_engine {
|
||||
mgmt_api_client.put_io_engine(engine_str).await?;
|
||||
|
||||
@@ -81,10 +81,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
));
|
||||
|
||||
if let Some(engine_str) = &args.set_io_engine {
|
||||
mgmt_api_client.put_io_engine(engine_str).await?;
|
||||
|
||||
@@ -38,10 +38,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
|
||||
@@ -65,8 +65,8 @@ pub struct PageServerConf {
|
||||
/// Period to reload certificate and private key from files.
|
||||
/// Default: 60s.
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificate to use in https APIs.
|
||||
pub ssl_ca_cert: Option<Certificate>,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
@@ -481,12 +481,12 @@ impl PageServerConf {
|
||||
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
|
||||
load_previous_heatmap: load_previous_heatmap.unwrap_or(true),
|
||||
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(true),
|
||||
ssl_ca_cert: match ssl_ca_file {
|
||||
ssl_ca_certs: match ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = std::fs::read(ssl_ca_file)?;
|
||||
Some(Certificate::from_pem(&buf)?)
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => None,
|
||||
None => Vec::new(),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ impl StorageControllerUpcallClient {
|
||||
client = client.default_headers(headers);
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_cert) = &conf.ssl_ca_cert {
|
||||
for ssl_ca_cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(ssl_ca_cert.clone());
|
||||
}
|
||||
|
||||
|
||||
@@ -82,7 +82,8 @@ static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
|
||||
static int pageserver_response_log_timeout = 10000;
|
||||
static int pageserver_response_disconnect_timeout = 120000; /* 2 minutes */
|
||||
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
|
||||
static int pageserver_response_disconnect_timeout = 150000;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
@@ -1450,7 +1451,7 @@ pg_init_libpagestore(void)
|
||||
"If the pageserver doesn't respond to a request within this timeout, "
|
||||
"disconnect and reconnect.",
|
||||
&pageserver_response_disconnect_timeout,
|
||||
120000, 100, INT_MAX,
|
||||
150000, 100, INT_MAX,
|
||||
PGC_SUSET,
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
@@ -38,9 +38,8 @@ pub enum Error {
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
/// Failed to create client.
|
||||
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
|
||||
CreateClient(reqwest::Error),
|
||||
#[error("request timed out: {0}")]
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@@ -217,7 +217,7 @@ struct Args {
|
||||
/// Period to reload certificate and private key from files.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificate to use in https APIs.
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
}
|
||||
@@ -353,13 +353,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
|
||||
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Some(Certificate::from_pem(&buf)?)
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => None,
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let conf = Arc::new(SafeKeeperConf {
|
||||
@@ -398,7 +398,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
ssl_key_file: args.ssl_key_file,
|
||||
ssl_cert_file: args.ssl_cert_file,
|
||||
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
||||
ssl_ca_cert,
|
||||
ssl_ca_certs,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
@@ -235,7 +235,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let resp = pull_timeline::handle_request(
|
||||
data,
|
||||
conf.sk_auth_token.clone(),
|
||||
conf.ssl_ca_cert.clone(),
|
||||
conf.ssl_ca_certs.clone(),
|
||||
global_timelines,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -120,7 +120,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_cert: Option<Certificate>,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -169,7 +169,7 @@ impl SafeKeeperConf {
|
||||
ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
ssl_ca_cert: None,
|
||||
ssl_ca_certs: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -393,7 +393,7 @@ pub struct DebugDumpResponse {
|
||||
pub async fn handle_request(
|
||||
request: PullTimelineRequest,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
ssl_ca_certs: Vec<Certificate>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
@@ -405,7 +405,7 @@ pub async fn handle_request(
|
||||
}
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
if let Some(ssl_ca_cert) = ssl_ca_cert {
|
||||
for ssl_ca_cert in ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
@@ -183,7 +183,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
ssl_key_file: Utf8PathBuf::from(""),
|
||||
ssl_cert_file: Utf8PathBuf::from(""),
|
||||
ssl_cert_reload_period: Duration::ZERO,
|
||||
ssl_ca_cert: None,
|
||||
ssl_ca_certs: Vec::new(),
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -624,16 +624,19 @@ impl ComputeHook {
|
||||
MaybeSendResult::Transmit((request, lock)) => (request, lock),
|
||||
};
|
||||
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
let result = if !self.config.use_local_compute_notifications {
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
} else {
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
} else {
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
} else {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
let result = if let Some(notify_url) = &compute_hook_url {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
|
||||
// We validate this at startup
|
||||
let notify_url = compute_hook_url.as_ref().unwrap();
|
||||
self.do_notify(notify_url, &request, cancel).await
|
||||
} else {
|
||||
self.do_notify_local(&request).await.map_err(|e| {
|
||||
|
||||
@@ -8,7 +8,6 @@ use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
|
||||
use pageserver_api::models::PageserverUtilization;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::models::SafekeeperUtilization;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use thiserror::Error;
|
||||
@@ -27,8 +26,8 @@ struct HeartbeaterTask<Server, State> {
|
||||
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
http_client: reqwest::Client,
|
||||
jwt_token: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -76,8 +75,8 @@ where
|
||||
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
http_client: reqwest::Client,
|
||||
jwt_token: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
cancel: CancellationToken,
|
||||
@@ -86,8 +85,8 @@ where
|
||||
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
|
||||
let mut heartbeater = HeartbeaterTask::new(
|
||||
receiver,
|
||||
http_client,
|
||||
jwt_token,
|
||||
ssl_ca_cert,
|
||||
max_offline_interval,
|
||||
max_warming_up_interval,
|
||||
cancel,
|
||||
@@ -122,8 +121,8 @@ where
|
||||
{
|
||||
fn new(
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
|
||||
http_client: reqwest::Client,
|
||||
jwt_token: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
cancel: CancellationToken,
|
||||
@@ -134,8 +133,8 @@ where
|
||||
state: HashMap::new(),
|
||||
max_offline_interval,
|
||||
max_warming_up_interval,
|
||||
http_client,
|
||||
jwt_token,
|
||||
ssl_ca_cert,
|
||||
}
|
||||
}
|
||||
async fn run(&mut self) {
|
||||
@@ -178,7 +177,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
let mut heartbeat_futs = FuturesUnordered::new();
|
||||
for (node_id, node) in &*pageservers {
|
||||
heartbeat_futs.push({
|
||||
let ssl_ca_cert = self.ssl_ca_cert.clone();
|
||||
let http_client = self.http_client.clone();
|
||||
let jwt_token = self.jwt_token.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
@@ -193,8 +192,8 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
let response = node_clone
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_utilization().await },
|
||||
&http_client,
|
||||
&jwt_token,
|
||||
&ssl_ca_cert,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(1),
|
||||
@@ -329,19 +328,19 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
continue;
|
||||
}
|
||||
heartbeat_futs.push({
|
||||
let http_client = self.http_client.clone();
|
||||
let jwt_token = self
|
||||
.jwt_token
|
||||
.as_ref()
|
||||
.map(|t| SecretString::from(t.to_owned()));
|
||||
let ssl_ca_cert = self.ssl_ca_cert.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
async move {
|
||||
let response = sk
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_utilization().await },
|
||||
&http_client,
|
||||
&jwt_token,
|
||||
&ssl_ca_cert,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(1),
|
||||
|
||||
@@ -656,11 +656,10 @@ async fn handle_tenant_timeline_passthrough(
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let client = mgmt_api::Client::new(
|
||||
service.get_http_client().clone(),
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
service.get_config().ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
);
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
|
||||
@@ -200,9 +200,14 @@ struct Cli {
|
||||
/// Period to reload certificate and private key from files.
|
||||
#[arg(long, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
|
||||
ssl_cert_reload_period: humantime::Duration,
|
||||
/// Trusted root CA certificate to use in https APIs.
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<PathBuf>,
|
||||
|
||||
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
|
||||
/// the compute notification directly (instead of via control plane).
|
||||
#[arg(long, default_value = "false")]
|
||||
use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -368,6 +373,9 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict if args.use_local_compute_notifications => {
|
||||
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
|
||||
}
|
||||
StrictMode::Strict => {
|
||||
tracing::info!("Starting in strict mode: configuration is OK.")
|
||||
}
|
||||
@@ -376,13 +384,13 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
|
||||
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Some(Certificate::from_pem(&buf)?)
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => None,
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let config = Config {
|
||||
@@ -425,8 +433,9 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
use_https_pageserver_api: args.use_https_pageserver_api,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
ssl_ca_cert,
|
||||
ssl_ca_certs,
|
||||
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
|
||||
use_local_compute_notifications: args.use_local_compute_notifications,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
|
||||
@@ -7,7 +7,7 @@ use pageserver_api::controller_api::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use reqwest::{Certificate, StatusCode};
|
||||
use reqwest::StatusCode;
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
@@ -280,8 +280,8 @@ impl Node {
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
&self,
|
||||
mut op: O,
|
||||
http_client: &reqwest::Client,
|
||||
jwt: &Option<String>,
|
||||
ssl_ca_cert: &Option<Certificate>,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
@@ -300,24 +300,13 @@ impl Node {
|
||||
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
|
||||
ApiError(_, _) => true,
|
||||
Cancelled => true,
|
||||
CreateClient(_) => true,
|
||||
Timeout(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: refactor PageserverClient and with_client_retires (#11113).
|
||||
let mut http_client = reqwest::ClientBuilder::new().timeout(timeout);
|
||||
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert.clone())
|
||||
}
|
||||
|
||||
let http_client = match http_client.build() {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => return Some(Err(mgmt_api::Error::CreateClient(err))),
|
||||
};
|
||||
|
||||
backoff::retry(
|
||||
|| {
|
||||
let client = PageserverClient::from_client(
|
||||
let client = PageserverClient::new(
|
||||
self.get_id(),
|
||||
http_client.clone(),
|
||||
self.base_url(),
|
||||
@@ -326,11 +315,14 @@ impl Node {
|
||||
|
||||
let node_cancel_fut = self.cancel.cancelled();
|
||||
|
||||
let op_fut = op(client);
|
||||
let op_fut = tokio::time::timeout(timeout, op(client));
|
||||
|
||||
async {
|
||||
tokio::select! {
|
||||
r = op_fut=> {r},
|
||||
r = op_fut => match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
|
||||
},
|
||||
_ = node_cancel_fut => {
|
||||
Err(mgmt_api::Error::Cancelled)
|
||||
}}
|
||||
|
||||
@@ -8,7 +8,7 @@ use pageserver_api::models::{
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::BlockUnblock;
|
||||
use pageserver_client::mgmt_api::{Client, Result};
|
||||
use reqwest::{Certificate, StatusCode};
|
||||
use reqwest::StatusCode;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
|
||||
@@ -47,25 +47,13 @@ macro_rules! measured_request {
|
||||
|
||||
impl PageserverClient {
|
||||
pub(crate) fn new(
|
||||
node_id: NodeId,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<&str>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Client::new(mgmt_api_endpoint, jwt, ssl_ca_cert)?,
|
||||
node_id_label: node_id.0.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn from_client(
|
||||
node_id: NodeId,
|
||||
raw_client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<&str>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
|
||||
inner: Client::new(raw_client, mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -997,10 +997,11 @@ impl Persistence {
|
||||
// Clear sharding flag
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(new_shard_count.literal() as i32))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
debug_assert!(updated > 0);
|
||||
assert!(updated == new_shard_count.count() as usize);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
|
||||
@@ -86,6 +86,9 @@ pub(super) struct Reconciler {
|
||||
|
||||
/// Access to persistent storage for updating generation numbers
|
||||
pub(crate) persistence: Arc<Persistence>,
|
||||
|
||||
/// HTTP client with proper CA certs.
|
||||
pub(crate) http_client: reqwest::Client,
|
||||
}
|
||||
|
||||
pub(crate) struct ReconcilerConfigBuilder {
|
||||
@@ -298,8 +301,8 @@ impl Reconciler {
|
||||
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
|
||||
.await
|
||||
},
|
||||
&self.http_client,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
timeout,
|
||||
@@ -419,10 +422,10 @@ impl Reconciler {
|
||||
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
self.http_client.clone(),
|
||||
node.base_url(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
self.service_config.ssl_ca_cert.clone(),
|
||||
)?;
|
||||
);
|
||||
|
||||
client
|
||||
.wait_lsn(
|
||||
@@ -443,10 +446,10 @@ impl Reconciler {
|
||||
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
self.http_client.clone(),
|
||||
node.base_url(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
self.service_config.ssl_ca_cert.clone(),
|
||||
)?;
|
||||
);
|
||||
|
||||
let timelines = client.timeline_list(&tenant_shard_id).await?;
|
||||
Ok(timelines
|
||||
@@ -483,8 +486,8 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.http_client,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
request_download_timeout * 2,
|
||||
@@ -778,8 +781,8 @@ impl Reconciler {
|
||||
let observed_conf = match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.http_client,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
1,
|
||||
Duration::from_secs(5),
|
||||
@@ -1127,8 +1130,8 @@ impl Reconciler {
|
||||
match origin
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.http_client,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
use reqwest::{Certificate, StatusCode};
|
||||
use reqwest::StatusCode;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
@@ -94,8 +94,8 @@ impl Safekeeper {
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
&self,
|
||||
mut op: O,
|
||||
http_client: &reqwest::Client,
|
||||
jwt: &Option<SecretString>,
|
||||
ssl_ca_cert: &Option<Certificate>,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
@@ -114,17 +114,10 @@ impl Safekeeper {
|
||||
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
|
||||
ApiError(_, _) => true,
|
||||
Cancelled => true,
|
||||
CreateClient(_) => true,
|
||||
Timeout(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: refactor SafekeeperClient and with_client_retires (#11113).
|
||||
let mut http_client = reqwest::Client::builder().timeout(timeout);
|
||||
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
|
||||
}
|
||||
let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?;
|
||||
|
||||
backoff::retry(
|
||||
|| {
|
||||
let client = SafekeeperClient::new(
|
||||
@@ -136,11 +129,14 @@ impl Safekeeper {
|
||||
|
||||
let node_cancel_fut = self.cancel.cancelled();
|
||||
|
||||
let op_fut = op(client);
|
||||
let op_fut = tokio::time::timeout(timeout, op(client));
|
||||
|
||||
async {
|
||||
tokio::select! {
|
||||
r = op_fut=> {r},
|
||||
r = op_fut => match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
|
||||
},
|
||||
_ = node_cancel_fut => {
|
||||
Err(mgmt_api::Error::Cancelled)
|
||||
}}
|
||||
|
||||
@@ -267,7 +267,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
|
||||
}
|
||||
mgmt_api::Error::Cancelled => ApiError::ShuttingDown,
|
||||
mgmt_api::Error::CreateClient(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
mgmt_api::Error::Timeout(e) => ApiError::Timeout(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -445,9 +445,11 @@ pub struct Config {
|
||||
|
||||
pub use_https_safekeeper_api: bool,
|
||||
|
||||
pub ssl_ca_cert: Option<Certificate>,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
|
||||
pub timelines_onto_safekeepers: bool,
|
||||
|
||||
pub use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
@@ -524,6 +526,9 @@ pub struct Service {
|
||||
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
|
||||
/// passes, it isn't safe to do any actions that mutate tenants.
|
||||
pub(crate) startup_complete: Barrier,
|
||||
|
||||
/// HTTP client with proper CA certs.
|
||||
http_client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl From<ReconcileWaitError> for ApiError {
|
||||
@@ -667,6 +672,10 @@ impl Service {
|
||||
&self.config
|
||||
}
|
||||
|
||||
pub fn get_http_client(&self) -> &reqwest::Client {
|
||||
&self.http_client
|
||||
}
|
||||
|
||||
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
|
||||
/// view of the world, and determine which pageservers are responsive.
|
||||
#[instrument(skip_all)]
|
||||
@@ -965,8 +974,8 @@ impl Service {
|
||||
let response = node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
5,
|
||||
timeout,
|
||||
@@ -1064,20 +1073,12 @@ impl Service {
|
||||
break;
|
||||
}
|
||||
|
||||
let client = match PageserverClient::new(
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
self.http_client.clone(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
) {
|
||||
Ok(client) => client,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to create client to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
);
|
||||
match client
|
||||
.location_config(
|
||||
tenant_shard_id,
|
||||
@@ -1655,17 +1656,36 @@ impl Service {
|
||||
let cancel = CancellationToken::new();
|
||||
let reconcilers_cancel = cancel.child_token();
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
// We intentionally disable the connection pool, so every request will create its own TCP connection.
|
||||
// It's especially important for heartbeaters to notice more network problems.
|
||||
//
|
||||
// TODO: It makes sense to use this client only in heartbeaters and create a second one with
|
||||
// connection pooling for everything else. But reqwest::Client may create a connection without
|
||||
// ever using it (it uses hyper's Client under the hood):
|
||||
// https://github.com/hyperium/hyper-util/blob/d51318df3461d40e5f5e5ca163cb3905ac960209/src/client/legacy/client.rs#L415
|
||||
//
|
||||
// Because of a bug in hyper0::Connection::graceful_shutdown such connections hang during
|
||||
// graceful server shutdown: https://github.com/hyperium/hyper/issues/2730
|
||||
//
|
||||
// The bug has been fixed in hyper v1, so keep alive may be enabled only after we migrate to hyper1.
|
||||
http_client = http_client.pool_max_idle_per_host(0);
|
||||
for ssl_ca_cert in &config.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let heartbeater_ps = Heartbeater::new(
|
||||
http_client.clone(),
|
||||
config.pageserver_jwt_token.clone(),
|
||||
config.ssl_ca_cert.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let heartbeater_sk = Heartbeater::new(
|
||||
http_client.clone(),
|
||||
config.safekeeper_jwt_token.clone(),
|
||||
config.ssl_ca_cert.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
@@ -1708,6 +1728,7 @@ impl Service {
|
||||
reconcilers_gate: Gate::default(),
|
||||
tenant_op_locks: Default::default(),
|
||||
node_op_locks: Default::default(),
|
||||
http_client,
|
||||
});
|
||||
|
||||
let result_task_this = this.clone();
|
||||
@@ -2013,8 +2034,8 @@ impl Service {
|
||||
let configs = match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -2092,8 +2113,8 @@ impl Service {
|
||||
.location_config(tenant_shard_id, config, None, false)
|
||||
.await
|
||||
},
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -3235,11 +3256,10 @@ impl Service {
|
||||
for tenant_shard_id in shard_ids {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
self.http_client.clone(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
);
|
||||
|
||||
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
|
||||
|
||||
@@ -3298,11 +3318,10 @@ impl Service {
|
||||
for (tenant_shard_id, node) in targets {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
self.http_client.clone(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
);
|
||||
futs.push(async move {
|
||||
let result = client
|
||||
.tenant_secondary_download(tenant_shard_id, wait)
|
||||
@@ -3427,8 +3446,8 @@ impl Service {
|
||||
.tenant_delete(TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
},
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
RECONCILE_TIMEOUT,
|
||||
@@ -3580,8 +3599,8 @@ impl Service {
|
||||
async fn create_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
locations: ShardMutationLocations,
|
||||
http_client: reqwest::Client,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
let latest = locations.latest.node;
|
||||
@@ -3594,8 +3613,7 @@ impl Service {
|
||||
);
|
||||
|
||||
let client =
|
||||
PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref(), ssl_ca_cert.clone())
|
||||
.map_err(|e| passthrough_api_error(&latest, e))?;
|
||||
PageserverClient::new(latest.get_id(), http_client.clone(), latest.base_url(), jwt.as_deref());
|
||||
|
||||
let timeline_info = client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
@@ -3616,11 +3634,10 @@ impl Service {
|
||||
|
||||
let client = PageserverClient::new(
|
||||
location.node.get_id(),
|
||||
http_client.clone(),
|
||||
location.node.base_url(),
|
||||
jwt.as_deref(),
|
||||
ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&location.node, e))?;
|
||||
);
|
||||
|
||||
let res = client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
@@ -3648,8 +3665,8 @@ impl Service {
|
||||
let timeline_info = create_one(
|
||||
shard_zero_tid,
|
||||
shard_zero_locations,
|
||||
self.http_client.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -3678,8 +3695,8 @@ impl Service {
|
||||
Box::pin(create_one(
|
||||
tenant_shard_id,
|
||||
mutation_locations,
|
||||
self.http_client.clone(),
|
||||
jwt.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
create_req,
|
||||
))
|
||||
},
|
||||
@@ -3762,16 +3779,15 @@ impl Service {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
http_client: reqwest::Client,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
req: TimelineArchivalConfigRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
tracing::info!(
|
||||
"Setting archival config of timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
.timeline_archival_config(tenant_shard_id, timeline_id, &req)
|
||||
@@ -3793,8 +3809,8 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.http_client.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
req.clone(),
|
||||
))
|
||||
})
|
||||
@@ -3831,16 +3847,15 @@ impl Service {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
http_client: reqwest::Client,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
behavior: Option<DetachBehavior>,
|
||||
) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> {
|
||||
tracing::info!(
|
||||
"Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
.timeline_detach_ancestor(tenant_shard_id, timeline_id, behavior)
|
||||
@@ -3879,8 +3894,8 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.http_client.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
behavior,
|
||||
))
|
||||
})
|
||||
@@ -3933,17 +3948,16 @@ impl Service {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
http_client: reqwest::Client,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<(), ApiError> {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
http_client,
|
||||
node.base_url(),
|
||||
jwt.as_deref(),
|
||||
ssl_ca_cert,
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
);
|
||||
|
||||
client
|
||||
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
|
||||
@@ -3962,8 +3976,8 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.http_client.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
dir,
|
||||
))
|
||||
})
|
||||
@@ -4091,8 +4105,8 @@ impl Service {
|
||||
let r = node
|
||||
.with_client_retries(
|
||||
|client| op(tenant_shard_id, client),
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
timeout,
|
||||
@@ -4316,15 +4330,14 @@ impl Service {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
http_client: reqwest::Client,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
tracing::info!(
|
||||
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
|
||||
let res = client
|
||||
.timeline_delete(tenant_shard_id, timeline_id)
|
||||
.await;
|
||||
@@ -4350,8 +4363,8 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.http_client.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -4373,8 +4386,8 @@ impl Service {
|
||||
shard_zero_tid,
|
||||
timeline_id,
|
||||
shard_zero_locations.latest.node,
|
||||
self.http_client.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
@@ -4809,8 +4822,8 @@ impl Service {
|
||||
|
||||
client.location_config(child_id, config, None, false).await
|
||||
},
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
10,
|
||||
Duration::from_secs(5),
|
||||
@@ -5412,11 +5425,10 @@ impl Service {
|
||||
} = target;
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
self.http_client.clone(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(node, e))?;
|
||||
);
|
||||
let response = client
|
||||
.tenant_shard_split(
|
||||
*parent_id,
|
||||
@@ -5456,6 +5468,8 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
pausable_failpoint!("shard-split-pre-complete");
|
||||
|
||||
// TODO: if the pageserver restarted concurrently with our split API call,
|
||||
// the actual generation of the child shard might differ from the generation
|
||||
// we expect it to have. In order for our in-database generation to end up
|
||||
@@ -5898,11 +5912,10 @@ impl Service {
|
||||
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
self.http_client.clone(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
);
|
||||
|
||||
let scan_result = client
|
||||
.tenant_scan_remote_storage(tenant_id)
|
||||
@@ -7136,6 +7149,7 @@ impl Service {
|
||||
units,
|
||||
gate_guard,
|
||||
&self.reconcilers_cancel,
|
||||
self.http_client.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -7543,8 +7557,8 @@ impl Service {
|
||||
match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7580,8 +7594,8 @@ impl Service {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7854,8 +7868,8 @@ impl Service {
|
||||
futures.push(async move {
|
||||
node.with_client_retries(
|
||||
|client| async move { client.top_tenant_shards(request.clone()).await },
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -7974,8 +7988,8 @@ impl Service {
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
Duration::from_millis(250),
|
||||
|
||||
@@ -338,7 +338,6 @@ impl SafekeeperReconciler {
|
||||
.safekeeper_jwt_token
|
||||
.clone()
|
||||
.map(SecretString::from);
|
||||
let ssl_ca_cert = self.service.config.ssl_ca_cert.clone();
|
||||
loop {
|
||||
let res = req
|
||||
.safekeeper
|
||||
@@ -347,8 +346,8 @@ impl SafekeeperReconciler {
|
||||
let closure = &closure;
|
||||
async move { closure(client).await }
|
||||
},
|
||||
self.service.get_http_client(),
|
||||
&jwt,
|
||||
&ssl_ca_cert,
|
||||
3,
|
||||
10,
|
||||
Duration::from_secs(10),
|
||||
|
||||
@@ -78,8 +78,8 @@ impl Service {
|
||||
for sk in timeline_persistence.sk_set.iter() {
|
||||
let sk_id = NodeId(*sk as u64);
|
||||
let safekeepers = safekeepers.clone();
|
||||
let http_client = self.http_client.clone();
|
||||
let jwt = jwt.clone();
|
||||
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
|
||||
let req = req.clone();
|
||||
joinset.spawn(async move {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
@@ -90,8 +90,8 @@ impl Service {
|
||||
let req = req.clone();
|
||||
async move { client.create_timeline(&req).await }
|
||||
},
|
||||
&http_client,
|
||||
&jwt,
|
||||
&ssl_ca_cert,
|
||||
3,
|
||||
3,
|
||||
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
|
||||
|
||||
@@ -1588,6 +1588,7 @@ impl TenantShard {
|
||||
units: ReconcileUnits,
|
||||
gate_guard: GateGuard,
|
||||
cancel: &CancellationToken,
|
||||
http_client: reqwest::Client,
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
|
||||
// doing our sequence's work.
|
||||
@@ -1633,6 +1634,7 @@ impl TenantShard {
|
||||
cancel: reconciler_cancel.clone(),
|
||||
persistence: persistence.clone(),
|
||||
compute_notify_failure: false,
|
||||
http_client,
|
||||
};
|
||||
|
||||
let reconcile_seq = self.sequence;
|
||||
|
||||
@@ -1169,6 +1169,12 @@ class NeonEnv:
|
||||
if storage_controller_config is not None:
|
||||
cfg["storage_controller"] = storage_controller_config
|
||||
|
||||
if config.test_may_use_compatibility_snapshot_binaries:
|
||||
if "storage_controller" in cfg:
|
||||
cfg["storage_controller"]["use_local_compute_notifications"] = False
|
||||
else:
|
||||
cfg["storage_controller"] = {"use_local_compute_notifications": False}
|
||||
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
@@ -1725,6 +1731,8 @@ class LogUtils:
|
||||
log.warning(f"Skipping log check: {logfile} does not exist")
|
||||
return None
|
||||
|
||||
log.info(f"Checking log {logfile} for pattern '{pattern}'")
|
||||
|
||||
contains_re = re.compile(pattern)
|
||||
|
||||
# XXX: Our rust logging machinery buffers the messages, so if you
|
||||
@@ -2618,10 +2626,13 @@ class NeonProxiedStorageController(NeonStorageController):
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
def instance_log_path(self, instance_id: int) -> Path:
|
||||
return self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log"
|
||||
|
||||
def assert_no_errors(self):
|
||||
for instance_id in self.instances.keys():
|
||||
assert_no_errors(
|
||||
self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log",
|
||||
self.instance_log_path(instance_id),
|
||||
"storage_controller",
|
||||
self.allowed_errors,
|
||||
)
|
||||
@@ -2629,7 +2640,14 @@ class NeonProxiedStorageController(NeonStorageController):
|
||||
def log_contains(
|
||||
self, pattern: str, offset: None | LogCursor = None
|
||||
) -> tuple[str, LogCursor] | None:
|
||||
raise NotImplementedError()
|
||||
for instance_id in self.instances.keys():
|
||||
log_path = self.instance_log_path(instance_id)
|
||||
checker = LogUtils(log_path)
|
||||
found = checker.log_contains(pattern, offset)
|
||||
if found is not None:
|
||||
return found
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -82,6 +82,7 @@ def test_storage_controller_many_tenants(
|
||||
# guard against regressions in restart time.
|
||||
"max_offline": "30s",
|
||||
"max_warming_up": "300s",
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
|
||||
@@ -5,11 +5,9 @@ import asyncio
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
|
||||
is all done via the storage controller, but this test will disable the storage controller's compute
|
||||
@@ -23,19 +21,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
def ignore_notify(request: Request):
|
||||
# This test does direct updates to compute configuration: disable the storage controller's notification
|
||||
log.info(f"Ignoring storage controller compute notification: {request.json}")
|
||||
return Response(status=200)
|
||||
|
||||
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
|
||||
ignore_notify
|
||||
)
|
||||
|
||||
env.create_branch("test_change_pageserver")
|
||||
endpoint = env.endpoints.create_start("test_change_pageserver")
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import fixtures.utils
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
@@ -592,17 +593,22 @@ def test_historic_storage_formats(
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
@pytest.mark.parametrize(
|
||||
**fixtures.utils.allpairs_versions(),
|
||||
)
|
||||
def test_versions_mismatch(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
combination,
|
||||
):
|
||||
"""
|
||||
Checks compatibility of different combinations of versions of the components
|
||||
"""
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
compatibility_snapshot_dir / "repo",
|
||||
|
||||
@@ -91,6 +91,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
def ignore_notify(request: Request):
|
||||
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
|
||||
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.
|
||||
|
||||
@@ -808,6 +808,8 @@ def test_sharding_split_stripe_size(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
|
||||
)
|
||||
@@ -1316,6 +1318,11 @@ def test_sharding_split_failures(
|
||||
initial_shard_count = 2
|
||||
split_shard_count = 4
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Route to `compute_reconfigure_listener` instead
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
|
||||
@@ -73,7 +73,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
|
||||
def test_storage_controller_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
|
||||
):
|
||||
"""
|
||||
Test the basic lifecycle of a storage controller:
|
||||
- Restarting
|
||||
@@ -83,6 +85,7 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
# Start services by hand so that we can skip a pageserver (this will start + register later)
|
||||
@@ -620,6 +623,8 @@ def test_storage_controller_compute_hook(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
@@ -738,6 +743,8 @@ def test_storage_controller_stuck_compute_hook(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
@@ -885,6 +892,8 @@ def test_storage_controller_compute_hook_retry(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
@@ -1008,6 +1017,8 @@ def test_storage_controller_compute_hook_revert(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
tenant_id = env.initial_tenant
|
||||
@@ -1398,6 +1409,11 @@ def test_storage_controller_tenant_deletion(
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Route to `compute_reconfigure_listener` instead
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
@@ -1599,6 +1615,12 @@ def test_storage_controller_heartbeats(
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*Call to node.*management API.*failed.*failpoint.*"
|
||||
)
|
||||
# The server starts listening to the socket before sending re-attach request,
|
||||
# but it starts serving HTTP only when re-attach is completed.
|
||||
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*Call to node.*management API.*failed.* Timeout.*"
|
||||
)
|
||||
|
||||
# Initially we have two online pageservers
|
||||
nodes = env.storage_controller.node_list()
|
||||
@@ -2170,7 +2192,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
@pytest.mark.parametrize("num_azs", [1, 2])
|
||||
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
|
||||
def test_graceful_cluster_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
num_azs: int,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
combination,
|
||||
):
|
||||
"""
|
||||
Graceful reststart of storage controller clusters use the drain and
|
||||
fill hooks in order to migrate attachments away from pageservers before
|
||||
@@ -2182,6 +2209,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int
|
||||
"""
|
||||
neon_env_builder.num_azs = num_azs
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
@@ -2437,7 +2465,6 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("while_offline", [True, False])
|
||||
def test_storage_controller_node_deletion(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
while_offline: bool,
|
||||
):
|
||||
"""
|
||||
@@ -2863,6 +2890,143 @@ def test_storage_controller_leadership_transfer(
|
||||
)
|
||||
|
||||
|
||||
def test_storage_controller_leadership_transfer_during_split(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
storage_controller_proxy: StorageControllerProxy,
|
||||
port_distributor: PortDistributor,
|
||||
):
|
||||
"""
|
||||
Exercise a race between shard splitting and graceful leadership transfer. This is
|
||||
a reproducer for https://github.com/neondatabase/neon/issues/11254
|
||||
"""
|
||||
neon_env_builder.auth_enabled = True
|
||||
|
||||
neon_env_builder.num_pageservers = 3
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"database_url": f"127.0.0.1:{port_distributor.get_port()}",
|
||||
"start_as_candidate": True,
|
||||
}
|
||||
|
||||
neon_env_builder.storage_controller_port_override = storage_controller_proxy.port()
|
||||
|
||||
storage_controller_1_port = port_distributor.get_port()
|
||||
storage_controller_2_port = port_distributor.get_port()
|
||||
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
start_env(env, storage_controller_1_port)
|
||||
|
||||
assert (
|
||||
env.storage_controller.get_leadership_status() == StorageControllerLeadershipStatus.LEADER
|
||||
)
|
||||
leader = env.storage_controller.get_leader()
|
||||
assert leader["address"] == f"http://127.0.0.1:{storage_controller_1_port}/"
|
||||
|
||||
tenant_count = 2
|
||||
shard_count = 4
|
||||
tenants = set(TenantId.generate() for _ in range(0, tenant_count))
|
||||
|
||||
for tid in tenants:
|
||||
env.storage_controller.tenant_create(
|
||||
tid, shard_count=shard_count, placement_policy={"Attached": 1}
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
# Start a shard split
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[".*Unexpected child shard count.*", ".*Enqueuing background abort.*"]
|
||||
)
|
||||
pause_failpoint = "shard-split-pre-complete"
|
||||
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
|
||||
split_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2
|
||||
)
|
||||
|
||||
def hit_failpoint():
|
||||
log.info("Checking log for pattern...")
|
||||
try:
|
||||
assert env.storage_controller.log_contains(f".*at failpoint {pause_failpoint}.*")
|
||||
except Exception:
|
||||
log.exception("Failed to find pattern in log")
|
||||
raise
|
||||
|
||||
wait_until(hit_failpoint, interval=0.1, status_interval=1.0)
|
||||
|
||||
env.storage_controller.start(
|
||||
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
|
||||
)
|
||||
|
||||
def passed_split_abort():
|
||||
try:
|
||||
log.info("Checking log for pattern...")
|
||||
assert env.storage_controller.log_contains(
|
||||
".*Using observed state received from leader.*"
|
||||
)
|
||||
except Exception:
|
||||
log.exception("Failed to find pattern in log")
|
||||
raise
|
||||
|
||||
log.info("Awaiting split abort")
|
||||
wait_until(passed_split_abort, interval=0.1, status_interval=1.0)
|
||||
assert env.storage_controller.log_contains(".*Aborting shard split.*")
|
||||
|
||||
# Proxy is still talking to original controller here: disable its pause failpoint so
|
||||
# that its shard split can run to completion.
|
||||
log.info("Disabling failpoint")
|
||||
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
|
||||
# on handling the shard split request.
|
||||
env.storage_controller.request(
|
||||
"PUT",
|
||||
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
|
||||
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def previous_stepped_down():
|
||||
assert (
|
||||
env.storage_controller.get_leadership_status()
|
||||
== StorageControllerLeadershipStatus.STEPPED_DOWN
|
||||
)
|
||||
|
||||
log.info("Awaiting step down")
|
||||
wait_until(previous_stepped_down)
|
||||
|
||||
# Let the shard split complete: this may happen _after_ the replacement has come up
|
||||
# and tried to clean up the databases
|
||||
log.info("Unblocking & awaiting shard split")
|
||||
with pytest.raises(Exception, match="Unexpected child shard count"):
|
||||
# This split fails when it tries to persist results, because it encounters
|
||||
# changes already made by the new controller's abort-on-startup
|
||||
split_fut.result()
|
||||
|
||||
log.info("Routing to new leader")
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
|
||||
|
||||
def new_becomes_leader():
|
||||
assert (
|
||||
env.storage_controller.get_leadership_status()
|
||||
== StorageControllerLeadershipStatus.LEADER
|
||||
)
|
||||
|
||||
wait_until(new_becomes_leader)
|
||||
leader = env.storage_controller.get_leader()
|
||||
assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/"
|
||||
|
||||
env.storage_controller.wait_until_ready()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Check that the stepped down instance forwards requests
|
||||
# to the new leader while it's still running.
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
|
||||
env.storage_controller.tenant_shard_dump()
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
|
||||
status = env.storage_controller.node_status(env.pageservers[0].id)
|
||||
assert status["scheduling"] == "Pause"
|
||||
|
||||
|
||||
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
|
||||
# single unsharded tenant, two locations
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
Reference in New Issue
Block a user