mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
Compare commits
33 Commits
log_subscr
...
cicd/debug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b66d4e0bdc | ||
|
|
6d9fd2bbb5 | ||
|
|
e5d9c003f5 | ||
|
|
290ce3ed46 | ||
|
|
1138e286b9 | ||
|
|
9173847f81 | ||
|
|
6ae62574d0 | ||
|
|
5e074d8536 | ||
|
|
3a22daf33e | ||
|
|
f40f627730 | ||
|
|
56b94b7d1b | ||
|
|
fe445b2945 | ||
|
|
7f49f45a53 | ||
|
|
980b212789 | ||
|
|
2b17a03911 | ||
|
|
b50a9d84d4 | ||
|
|
6fd3c9daa5 | ||
|
|
409171ab08 | ||
|
|
f26240c9dc | ||
|
|
1c771267ab | ||
|
|
9f1b7b72ed | ||
|
|
2476f7ef74 | ||
|
|
f555cb3970 | ||
|
|
10a726503a | ||
|
|
7ba86e15fa | ||
|
|
7ba42bfdb4 | ||
|
|
4f8a39d6c6 | ||
|
|
54c5da3981 | ||
|
|
c1378dc43b | ||
|
|
50b9fb430a | ||
|
|
486eaba028 | ||
|
|
d4d70cc314 | ||
|
|
176eefa47a |
4
.github/actionlint.yml
vendored
4
.github/actionlint.yml
vendored
@@ -6,6 +6,10 @@ self-hosted-runner:
|
||||
- small
|
||||
- small-arm64
|
||||
- us-east-2
|
||||
- aws-arm64-8core
|
||||
- aws-arm64-16core
|
||||
- aws-arm64-32core
|
||||
- qemu-arm64
|
||||
config-variables:
|
||||
- BENCHMARK_PROJECT_ID_PUB
|
||||
- BENCHMARK_PROJECT_ID_SUB
|
||||
|
||||
@@ -131,7 +131,7 @@ runs:
|
||||
fi
|
||||
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
|
||||
# -n sets the number of parallel processes that pytest-xdist will run
|
||||
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
|
||||
EXTRA_PARAMS="-n auto $EXTRA_PARAMS"
|
||||
|
||||
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
|
||||
# to the same worker to make @pytest.mark.order work with xdist
|
||||
@@ -169,10 +169,8 @@ runs:
|
||||
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
|
||||
fi
|
||||
|
||||
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
|
||||
cov_prefix=()
|
||||
else
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
26
.github/workflows/_build-and-test-locally.yml
vendored
26
.github/workflows/_build-and-test-locally.yml
vendored
@@ -36,7 +36,7 @@ env:
|
||||
|
||||
jobs:
|
||||
build-neon:
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
|
||||
container:
|
||||
image: ${{ inputs.build-tools-image }}
|
||||
credentials:
|
||||
@@ -94,11 +94,16 @@ jobs:
|
||||
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
|
||||
# corresponding Cargo.toml files for their descriptions.
|
||||
- name: Set env variables
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
run: |
|
||||
CARGO_FEATURES="--features testing"
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
|
||||
CARGO_FLAGS="--locked"
|
||||
elif [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=""
|
||||
CARGO_FLAGS="--locked"
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=""
|
||||
CARGO_FLAGS="--locked --release"
|
||||
@@ -110,6 +115,11 @@ jobs:
|
||||
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
|
||||
} >> $GITHUB_ENV
|
||||
|
||||
# See https://github.com/aws/aws-graviton-getting-started/blob/57dc813626d0266f1cc12ef83474745bb1f31fb4/rust.md
|
||||
- name: Set RUSTFLAGS for ARM
|
||||
if: inputs.arch == 'arm64'
|
||||
run: echo "RUSTFLAGS=-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1" >> $GITHUB_ENV
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg_14
|
||||
uses: actions/cache@v4
|
||||
@@ -153,11 +163,13 @@ jobs:
|
||||
run: |
|
||||
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
|
||||
export PQ_LIB_DIR
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc)
|
||||
|
||||
# Do install *before* running rust tests because they might recompile the
|
||||
# binaries with different features/flags.
|
||||
- name: Install rust binaries
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
run: |
|
||||
# Install target binaries
|
||||
mkdir -p /tmp/neon/bin/
|
||||
@@ -172,7 +184,7 @@ jobs:
|
||||
done
|
||||
|
||||
# Install test executables and write list of all binaries (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
# Keep bloated coverage data files away from the rest of the artifact
|
||||
mkdir -p /tmp/coverage/
|
||||
|
||||
@@ -243,10 +255,10 @@ jobs:
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
regress-tests:
|
||||
# Run test on x64 only
|
||||
if: inputs.arch == 'x64'
|
||||
# Don't run regression tests on debug arm64 builds
|
||||
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
|
||||
needs: [ build-neon ]
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
|
||||
container:
|
||||
image: ${{ inputs.build-tools-image }}
|
||||
credentials:
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -198,7 +198,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
arch: [ x64 ]
|
||||
arch: [ x64, arm64 ]
|
||||
# Do not build or run tests in debug for release branches
|
||||
build-type: ${{ fromJson((startsWith(github.ref_name, 'release') && github.event_name == 'push') && '["release"]' || '["debug", "release"]') }}
|
||||
include:
|
||||
|
||||
@@ -217,7 +217,7 @@ impl StorageController {
|
||||
Ok(exitcode.success())
|
||||
}
|
||||
|
||||
/// Create our database if it doesn't exist
|
||||
/// Create our database if it doesn't exist, and run migrations.
|
||||
///
|
||||
/// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
|
||||
/// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
|
||||
@@ -382,6 +382,7 @@ impl StorageController {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Run migrations on every startup, in case something changed.
|
||||
self.setup_database(postgres_port).await?;
|
||||
}
|
||||
|
||||
@@ -453,11 +454,6 @@ impl StorageController {
|
||||
let jwt_token =
|
||||
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
|
||||
args.push(format!("--jwt-token={jwt_token}"));
|
||||
|
||||
let peer_claims = Claims::new(None, Scope::Admin);
|
||||
let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
|
||||
.expect("failed to generate jwt token");
|
||||
args.push(format!("--peer-jwt-token={peer_jwt_token}"));
|
||||
}
|
||||
|
||||
if let Some(public_key) = &self.public_key {
|
||||
|
||||
@@ -413,10 +413,6 @@ where
|
||||
let mut content = Vec::with_capacity(n_origins * 16 + 8);
|
||||
content.extend_from_slice(&pg_constants::REPLICATION_STATE_MAGIC.to_le_bytes());
|
||||
for (origin_id, origin_lsn) in repl_origins {
|
||||
info!(
|
||||
"Include origin_id={}, origin_lsn={} in basebackup",
|
||||
origin_id, origin_lsn
|
||||
);
|
||||
content.extend_from_slice(&origin_id.to_le_bytes());
|
||||
content.extend_from_slice(&[0u8; 6]); // align to 8 bytes
|
||||
content.extend_from_slice(&origin_lsn.0.to_le_bytes());
|
||||
|
||||
@@ -1803,15 +1803,6 @@ pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static SECONDARY_HEATMAP_TOTAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_secondary_heatmap_total_size",
|
||||
"The total size in bytes of all layers in the most recently downloaded heatmap.",
|
||||
&["tenant_id", "shard_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum RemoteOpKind {
|
||||
Upload,
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::{sync::Arc, time::SystemTime};
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
metrics::SECONDARY_HEATMAP_TOTAL_SIZE,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
};
|
||||
|
||||
@@ -106,9 +105,6 @@ pub(crate) struct SecondaryTenant {
|
||||
|
||||
// Sum of layer sizes on local disk
|
||||
pub(super) resident_size_metric: UIntGauge,
|
||||
|
||||
// Sum of layer sizes in the most recently downloaded heatmap
|
||||
pub(super) heatmap_total_size_metric: UIntGauge,
|
||||
}
|
||||
|
||||
impl Drop for SecondaryTenant {
|
||||
@@ -116,7 +112,6 @@ impl Drop for SecondaryTenant {
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,10 +128,6 @@ impl SecondaryTenant {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id])
|
||||
.unwrap();
|
||||
|
||||
let heatmap_total_size_metric = SECONDARY_HEATMAP_TOTAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id])
|
||||
.unwrap();
|
||||
|
||||
Arc::new(Self {
|
||||
tenant_shard_id,
|
||||
// todo: shall we make this a descendent of the
|
||||
@@ -154,7 +145,6 @@ impl SecondaryTenant {
|
||||
progress: std::sync::Mutex::default(),
|
||||
|
||||
resident_size_metric,
|
||||
heatmap_total_size_metric,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -829,12 +829,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
layers_downloaded: 0,
|
||||
bytes_downloaded: 0,
|
||||
};
|
||||
|
||||
// Also expose heatmap bytes_total as a metric
|
||||
self.secondary_state
|
||||
.heatmap_total_size_metric
|
||||
.set(heatmap_stats.bytes);
|
||||
|
||||
// Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
|
||||
let mut delete_layers = Vec::new();
|
||||
let mut delete_timelines = Vec::new();
|
||||
|
||||
@@ -756,23 +756,11 @@ impl VirtualFile {
|
||||
})
|
||||
}
|
||||
|
||||
/// The function aborts the process if the error is fatal.
|
||||
async fn write_at<B: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let (slice, result) = self.write_at_inner(buf, offset, _ctx).await;
|
||||
let result = result.maybe_fatal_err("write_at");
|
||||
(slice, result)
|
||||
}
|
||||
|
||||
async fn write_at_inner<B: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
|
||||
@@ -235,7 +235,6 @@ impl WalIngest {
|
||||
&parsed_xact,
|
||||
info == pg_constants::XLOG_XACT_COMMIT,
|
||||
decoded.origin_id,
|
||||
lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -249,7 +248,6 @@ impl WalIngest {
|
||||
&parsed_xact,
|
||||
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
|
||||
decoded.origin_id,
|
||||
lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -410,16 +408,11 @@ impl WalIngest {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_REPLORIGIN_SET {
|
||||
let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
|
||||
info!(
|
||||
"Set replication origin_id={}, origin_lsn={}",
|
||||
xlrec.node_id, xlrec.remote_lsn
|
||||
);
|
||||
modification
|
||||
.set_replorigin(xlrec.node_id, xlrec.remote_lsn)
|
||||
.await?
|
||||
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
|
||||
let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
|
||||
info!("Drop replication origin_id={}", xlrec.node_id);
|
||||
modification.drop_replorigin(xlrec.node_id).await?
|
||||
}
|
||||
}
|
||||
@@ -1227,7 +1220,6 @@ impl WalIngest {
|
||||
parsed: &XlXactParsedRecord,
|
||||
is_commit: bool,
|
||||
origin_id: u16,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Record update of CLOG pages
|
||||
@@ -1294,10 +1286,6 @@ impl WalIngest {
|
||||
}
|
||||
}
|
||||
if origin_id != 0 {
|
||||
info!(
|
||||
"Commit at {} origin_id={}, origin_lsn={}",
|
||||
lsn, origin_id, parsed.origin_lsn
|
||||
);
|
||||
modification
|
||||
.set_replorigin(origin_id, parsed.origin_lsn)
|
||||
.await?;
|
||||
|
||||
@@ -173,6 +173,9 @@ struct ProxyCliArgs {
|
||||
/// cache for `role_secret` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
role_secret_cache: String,
|
||||
/// disable ip check for http requests. If it is too time consuming, it could be turned off.
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
disable_ip_check_for_http: bool,
|
||||
/// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections)
|
||||
#[clap(long)]
|
||||
redis_notifications: Option<String>,
|
||||
@@ -658,7 +661,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
)?;
|
||||
|
||||
let http_config = HttpConfig {
|
||||
accept_websockets: true,
|
||||
pool_options: GlobalConnPoolOptions {
|
||||
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
|
||||
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,
|
||||
|
||||
@@ -52,7 +52,6 @@ pub struct TlsConfig {
|
||||
}
|
||||
|
||||
pub struct HttpConfig {
|
||||
pub accept_websockets: bool,
|
||||
pub pool_options: GlobalConnPoolOptions,
|
||||
pub cancel_set: CancelSet,
|
||||
pub client_conn_threshold: u64,
|
||||
|
||||
@@ -10,7 +10,6 @@ mod json;
|
||||
mod sql_over_http;
|
||||
mod websocket;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use atomic_take::AtomicTake;
|
||||
use bytes::Bytes;
|
||||
pub use conn_pool::GlobalConnPoolOptions;
|
||||
@@ -27,9 +26,8 @@ use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
pub use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::timeout;
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tokio_rustls::{server::TlsStream, TlsAcceptor};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::cancellation::CancellationHandlerMain;
|
||||
@@ -43,7 +41,7 @@ use crate::serverless::backend::PoolingBackend;
|
||||
use crate::serverless::http_util::{api_error_into_response, json_response};
|
||||
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::pin::{pin, Pin};
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -88,18 +86,18 @@ pub async fn task_main(
|
||||
config,
|
||||
endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
|
||||
});
|
||||
let tls_acceptor: Arc<dyn MaybeTlsAcceptor> = match config.tls_config.as_ref() {
|
||||
Some(config) => {
|
||||
let mut tls_server_config = rustls::ServerConfig::clone(&config.to_server_config());
|
||||
// prefer http2, but support http/1.1
|
||||
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
Arc::new(tls_server_config) as Arc<_>
|
||||
}
|
||||
|
||||
let tls_config = match config.tls_config.as_ref() {
|
||||
Some(config) => config,
|
||||
None => {
|
||||
warn!("TLS config is missing");
|
||||
Arc::new(NoTls) as Arc<_>
|
||||
warn!("TLS config is missing, WebSocket Secure server will not be started");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let mut tls_server_config = rustls::ServerConfig::clone(&tls_config.to_server_config());
|
||||
// prefer http2, but support http/1.1
|
||||
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
let tls_acceptor: tokio_rustls::TlsAcceptor = Arc::new(tls_server_config).into();
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
connections.close(); // allows `connections.wait to complete`
|
||||
@@ -178,41 +176,16 @@ pub async fn task_main(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + 'static {}
|
||||
impl<T: AsyncRead + AsyncWrite + Send + 'static> AsyncReadWrite for T {}
|
||||
pub type AsyncRW = Pin<Box<dyn AsyncReadWrite>>;
|
||||
|
||||
#[async_trait]
|
||||
trait MaybeTlsAcceptor: Send + Sync + 'static {
|
||||
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MaybeTlsAcceptor for rustls::ServerConfig {
|
||||
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW> {
|
||||
Ok(Box::pin(TlsAcceptor::from(self).accept(conn).await?))
|
||||
}
|
||||
}
|
||||
|
||||
struct NoTls;
|
||||
|
||||
#[async_trait]
|
||||
impl MaybeTlsAcceptor for NoTls {
|
||||
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW> {
|
||||
Ok(Box::pin(conn))
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the TCP startup lifecycle.
|
||||
/// 1. Parses PROXY protocol V2
|
||||
/// 2. Handles TLS handshake
|
||||
async fn connection_startup(
|
||||
config: &ProxyConfig,
|
||||
tls_acceptor: Arc<dyn MaybeTlsAcceptor>,
|
||||
tls_acceptor: TlsAcceptor,
|
||||
session_id: uuid::Uuid,
|
||||
conn: TcpStream,
|
||||
peer_addr: SocketAddr,
|
||||
) -> Option<(AsyncRW, IpAddr)> {
|
||||
) -> Option<(TlsStream<ChainRW<TcpStream>>, IpAddr)> {
|
||||
// handle PROXY protocol
|
||||
let (conn, peer) = match read_proxy_protocol(conn).await {
|
||||
Ok(c) => c,
|
||||
@@ -268,7 +241,7 @@ async fn connection_handler(
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_token: CancellationToken,
|
||||
conn: AsyncRW,
|
||||
conn: TlsStream<ChainRW<TcpStream>>,
|
||||
peer_addr: IpAddr,
|
||||
session_id: uuid::Uuid,
|
||||
) {
|
||||
@@ -353,9 +326,7 @@ async fn request_handler(
|
||||
.map(|s| s.to_string());
|
||||
|
||||
// Check if the request is a websocket upgrade request.
|
||||
if config.http_config.accept_websockets
|
||||
&& framed_websockets::upgrade::is_upgrade_request(&request)
|
||||
{
|
||||
if framed_websockets::upgrade::is_upgrade_request(&request) {
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
peer_addr,
|
||||
|
||||
@@ -758,7 +758,6 @@ mod tests {
|
||||
async fn test_pool() {
|
||||
let _ = env_logger::try_init();
|
||||
let config = Box::leak(Box::new(crate::config::HttpConfig {
|
||||
accept_websockets: false,
|
||||
pool_options: GlobalConnPoolOptions {
|
||||
max_conns_per_endpoint: 2,
|
||||
gc_epoch: Duration::from_secs(1),
|
||||
|
||||
@@ -147,7 +147,7 @@ impl UserFacingError for ConnInfoError {
|
||||
fn get_conn_info(
|
||||
ctx: &RequestMonitoring,
|
||||
headers: &HeaderMap,
|
||||
tls: Option<&TlsConfig>,
|
||||
tls: &TlsConfig,
|
||||
) -> Result<ConnInfo, ConnInfoError> {
|
||||
// HTTP only uses cleartext (for now and likely always)
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
@@ -184,22 +184,12 @@ fn get_conn_info(
|
||||
.ok_or(ConnInfoError::MissingPassword)?;
|
||||
let password = urlencoding::decode_binary(password.as_bytes());
|
||||
|
||||
let endpoint = match connection_url.host() {
|
||||
Some(url::Host::Domain(hostname)) => {
|
||||
if let Some(tls) = tls {
|
||||
endpoint_sni(hostname, &tls.common_names)?
|
||||
.ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
} else {
|
||||
hostname
|
||||
.split_once(".")
|
||||
.map_or(hostname, |(prefix, _)| prefix)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
Some(url::Host::Ipv4(_)) | Some(url::Host::Ipv6(_)) | None => {
|
||||
return Err(ConnInfoError::MissingHostname)
|
||||
}
|
||||
};
|
||||
let hostname = connection_url
|
||||
.host_str()
|
||||
.ok_or(ConnInfoError::MissingHostname)?;
|
||||
|
||||
let endpoint =
|
||||
endpoint_sni(hostname, &tls.common_names)?.ok_or(ConnInfoError::MalformedEndpoint)?;
|
||||
ctx.set_endpoint_id(endpoint.clone());
|
||||
|
||||
let pairs = connection_url.query_pairs();
|
||||
@@ -512,7 +502,7 @@ async fn handle_inner(
|
||||
let headers = request.headers();
|
||||
|
||||
// TLS config should be there.
|
||||
let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref())?;
|
||||
let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref().unwrap())?;
|
||||
info!(user = conn_info.user_info.user.as_str(), "credentials");
|
||||
|
||||
// Allow connection pooling only if explicitly requested
|
||||
|
||||
@@ -18,6 +18,7 @@ import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
CREATE_TABLE = """
|
||||
CREATE TYPE arch AS ENUM ('ARM64', 'X64', 'UNKNOWN');
|
||||
CREATE TABLE IF NOT EXISTS results (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
parent_suite TEXT NOT NULL,
|
||||
@@ -28,6 +29,7 @@ CREATE TABLE IF NOT EXISTS results (
|
||||
stopped_at TIMESTAMPTZ NOT NULL,
|
||||
duration INT NOT NULL,
|
||||
flaky BOOLEAN NOT NULL,
|
||||
arch arch DEFAULT 'X64',
|
||||
build_type TEXT NOT NULL,
|
||||
pg_version INT NOT NULL,
|
||||
run_id BIGINT NOT NULL,
|
||||
@@ -35,7 +37,7 @@ CREATE TABLE IF NOT EXISTS results (
|
||||
reference TEXT NOT NULL,
|
||||
revision CHAR(40) NOT NULL,
|
||||
raw JSONB COMPRESSION lz4 NOT NULL,
|
||||
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
|
||||
UNIQUE (parent_suite, suite, name, arch, build_type, pg_version, started_at, stopped_at, run_id)
|
||||
);
|
||||
"""
|
||||
|
||||
@@ -50,6 +52,7 @@ class Row:
|
||||
stopped_at: datetime
|
||||
duration: int
|
||||
flaky: bool
|
||||
arch: str
|
||||
build_type: str
|
||||
pg_version: int
|
||||
run_id: int
|
||||
@@ -121,6 +124,14 @@ def ingest_test_result(
|
||||
raw.pop("labels")
|
||||
raw.pop("extra")
|
||||
|
||||
# All allure parameters are prefixed with "__", see test_runner/fixtures/parametrize.py
|
||||
parameters = {
|
||||
p["name"].removeprefix("__"): p["value"]
|
||||
for p in test["parameters"]
|
||||
if p["name"].startswith("__")
|
||||
}
|
||||
arch = parameters.get("arch", "UNKNOWN").strip("'")
|
||||
|
||||
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
|
||||
labels = {label["name"]: label["value"] for label in test["labels"]}
|
||||
row = Row(
|
||||
@@ -132,6 +143,7 @@ def ingest_test_result(
|
||||
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
|
||||
duration=test["time"]["duration"],
|
||||
flaky=test["flaky"] or test["retriesStatusChange"],
|
||||
arch=arch,
|
||||
build_type=build_type,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
|
||||
@@ -1,135 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use hyper::Uri;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
peer_client::{GlobalObservedState, PeerClient},
|
||||
persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence},
|
||||
service::Config,
|
||||
};
|
||||
|
||||
/// Helper for storage controller leadership acquisition
|
||||
pub(crate) struct Leadership {
|
||||
persistence: Arc<Persistence>,
|
||||
config: Config,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum Error {
|
||||
#[error(transparent)]
|
||||
Database(#[from] DatabaseError),
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl Leadership {
|
||||
pub(crate) fn new(
|
||||
persistence: Arc<Persistence>,
|
||||
config: Config,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence,
|
||||
config,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the current leader in the database and request it to step down if required.
|
||||
/// Should be called early on in within the start-up sequence.
|
||||
///
|
||||
/// Returns a tuple of two optionals: the current leader and its observed state
|
||||
pub(crate) async fn step_down_current_leader(
|
||||
&self,
|
||||
) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
|
||||
let leader = self.current_leader().await?;
|
||||
let leader_step_down_state = if let Some(ref leader) = leader {
|
||||
if self.config.start_as_candidate {
|
||||
self.request_step_down(leader).await
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
tracing::info!("No leader found to request step down from. Will build observed state.");
|
||||
None
|
||||
};
|
||||
|
||||
Ok((leader, leader_step_down_state))
|
||||
}
|
||||
|
||||
/// Mark the current storage controller instance as the leader in the database
|
||||
pub(crate) async fn become_leader(
|
||||
&self,
|
||||
current_leader: Option<ControllerPersistence>,
|
||||
) -> Result<()> {
|
||||
if let Some(address_for_peers) = &self.config.address_for_peers {
|
||||
// TODO: `address-for-peers` can become a mandatory cli arg
|
||||
// after we update the k8s setup
|
||||
let proposed_leader = ControllerPersistence {
|
||||
address: address_for_peers.to_string(),
|
||||
started_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
self.persistence
|
||||
.update_leader(current_leader, proposed_leader)
|
||||
.await
|
||||
.map_err(Error::Database)
|
||||
} else {
|
||||
tracing::info!("No address-for-peers provided. Skipping leader persistence.");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
|
||||
let res = self.persistence.get_leader().await;
|
||||
if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
|
||||
{
|
||||
const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
|
||||
if err.message().trim() == REL_NOT_FOUND_MSG {
|
||||
// Special case: if this is a brand new storage controller, migrations will not
|
||||
// have run at this point yet, and, hence, the controllers table does not exist.
|
||||
// Detect this case via the error string (diesel doesn't type it) and allow it.
|
||||
tracing::info!("Detected first storage controller start-up. Allowing missing controllers table ...");
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Request step down from the currently registered leader in the database
|
||||
///
|
||||
/// If such an entry is persisted, the success path returns the observed
|
||||
/// state and details of the leader. Otherwise, None is returned indicating
|
||||
/// there is no leader currently.
|
||||
async fn request_step_down(
|
||||
&self,
|
||||
leader: &ControllerPersistence,
|
||||
) -> Option<GlobalObservedState> {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
let client = PeerClient::new(
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.peer_jwt_token.clone(),
|
||||
);
|
||||
let state = client.step_down(&self.cancel).await;
|
||||
match state {
|
||||
Ok(state) => Some(state),
|
||||
Err(err) => {
|
||||
// TODO: Make leaders periodically update a timestamp field in the
|
||||
// database and, if the leader is not reachable from the current instance,
|
||||
// but inferred as alive from the timestamp, abort start-up. This avoids
|
||||
// a potential scenario in which we have two controllers acting as leaders.
|
||||
tracing::error!(
|
||||
"Leader ({}) did not respond to step-down request: {}",
|
||||
leader.address,
|
||||
err
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,6 @@ mod drain_utils;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
mod id_lock_map;
|
||||
mod leadership;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
mod pageserver_client;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::{anyhow, Context};
|
||||
use clap::Parser;
|
||||
use diesel::Connection;
|
||||
use hyper::Uri;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use metrics::BuildInfo;
|
||||
@@ -26,6 +27,9 @@ use utils::{project_build_tag, project_git_version, tcp_listener};
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
|
||||
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
|
||||
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
@@ -47,9 +51,6 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
control_plane_jwt_token: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
peer_jwt_token: Option<String>,
|
||||
|
||||
/// URL to control plane compute notification endpoint
|
||||
#[arg(long)]
|
||||
compute_hook_url: Option<String>,
|
||||
@@ -129,28 +130,28 @@ struct Secrets {
|
||||
public_key: Option<JwtAuth>,
|
||||
jwt_token: Option<String>,
|
||||
control_plane_jwt_token: Option<String>,
|
||||
peer_jwt_token: Option<String>,
|
||||
}
|
||||
|
||||
impl Secrets {
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
|
||||
const PEER_JWT_TOKEN_ENV: &'static str = "PEER_JWT_TOKEN";
|
||||
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
|
||||
|
||||
/// Load secrets from, in order of preference:
|
||||
/// - CLI args if database URL is provided on the CLI
|
||||
/// - Environment variables if DATABASE_URL is set.
|
||||
/// - AWS Secrets Manager secrets
|
||||
async fn load(args: &Cli) -> anyhow::Result<Self> {
|
||||
let Some(database_url) = Self::load_secret(&args.database_url, Self::DATABASE_URL_ENV)
|
||||
let Some(database_url) =
|
||||
Self::load_secret(&args.database_url, Self::DATABASE_URL_ENV).await
|
||||
else {
|
||||
anyhow::bail!(
|
||||
"Database URL is not set (set `--database-url`, or `DATABASE_URL` environment)"
|
||||
)
|
||||
};
|
||||
|
||||
let public_key = match Self::load_secret(&args.public_key, Self::PUBLIC_KEY_ENV) {
|
||||
let public_key = match Self::load_secret(&args.public_key, Self::PUBLIC_KEY_ENV).await {
|
||||
Some(v) => Some(JwtAuth::from_key(v).context("Loading public key")?),
|
||||
None => None,
|
||||
};
|
||||
@@ -158,18 +159,18 @@ impl Secrets {
|
||||
let this = Self {
|
||||
database_url,
|
||||
public_key,
|
||||
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV),
|
||||
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV).await,
|
||||
control_plane_jwt_token: Self::load_secret(
|
||||
&args.control_plane_jwt_token,
|
||||
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
|
||||
),
|
||||
peer_jwt_token: Self::load_secret(&args.peer_jwt_token, Self::PEER_JWT_TOKEN_ENV),
|
||||
)
|
||||
.await,
|
||||
};
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
|
||||
async fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
|
||||
if let Some(v) = cli {
|
||||
Some(v.clone())
|
||||
} else if let Ok(v) = std::env::var(env_name) {
|
||||
@@ -180,6 +181,20 @@ impl Secrets {
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the diesel migrations that are built into this binary
|
||||
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||
use diesel::PgConnection;
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
let mut conn = PgConnection::establish(database_url)?;
|
||||
|
||||
HarnessWithOutput::write_to_stdout(&mut conn)
|
||||
.run_pending_migrations(MIGRATIONS)
|
||||
.map(|_| ())
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
@@ -269,7 +284,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
let config = Config {
|
||||
jwt_token: secrets.jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
peer_jwt_token: secrets.peer_jwt_token,
|
||||
compute_hook_url: args.compute_hook_url,
|
||||
max_offline_interval: args
|
||||
.max_offline_interval
|
||||
@@ -290,9 +304,13 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
http_service_port: args.listen.port() as i32,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
// After loading secrets & config, but before starting anything else, apply database migrations
|
||||
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
|
||||
|
||||
migration_run(&secrets.database_url)
|
||||
.await
|
||||
.context("Running database migrations")?;
|
||||
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url));
|
||||
|
||||
let service = Service::spawn(config, persistence.clone()).await?;
|
||||
|
||||
@@ -230,7 +230,6 @@ pub(crate) enum DatabaseErrorLabel {
|
||||
Connection,
|
||||
ConnectionPool,
|
||||
Logical,
|
||||
Migration,
|
||||
}
|
||||
|
||||
impl DatabaseError {
|
||||
@@ -240,7 +239,6 @@ impl DatabaseError {
|
||||
Self::Connection(_) => DatabaseErrorLabel::Connection,
|
||||
Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool,
|
||||
Self::Logical(_) => DatabaseErrorLabel::Logical,
|
||||
Self::Migration(_) => DatabaseErrorLabel::Migration,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,9 +25,6 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::node::Node;
|
||||
|
||||
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
|
||||
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
|
||||
|
||||
/// ## What do we store?
|
||||
///
|
||||
/// The storage controller service does not store most of its state durably.
|
||||
@@ -75,8 +72,6 @@ pub(crate) enum DatabaseError {
|
||||
ConnectionPool(#[from] r2d2::Error),
|
||||
#[error("Logical error: {0}")]
|
||||
Logical(String),
|
||||
#[error("Migration error: {0}")]
|
||||
Migration(String),
|
||||
}
|
||||
|
||||
#[derive(measured::FixedCardinalityLabel, Copy, Clone)]
|
||||
@@ -172,19 +167,6 @@ impl Persistence {
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the diesel migrations that are built into this binary
|
||||
pub(crate) async fn migration_run(&self) -> DatabaseResult<()> {
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
|
||||
self.with_conn(move |conn| -> DatabaseResult<()> {
|
||||
HarnessWithOutput::write_to_stdout(conn)
|
||||
.run_pending_migrations(MIGRATIONS)
|
||||
.map(|_| ())
|
||||
.map_err(|e| DatabaseError::Migration(e.to_string()))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Wraps `with_conn` in order to collect latency and error metrics
|
||||
async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
|
||||
where
|
||||
|
||||
@@ -17,9 +17,8 @@ use crate::{
|
||||
compute_hook::NotifyError,
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
leadership::Leadership,
|
||||
metrics,
|
||||
peer_client::GlobalObservedState,
|
||||
peer_client::{GlobalObservedState, PeerClient},
|
||||
persistence::{
|
||||
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
|
||||
TenantFilter,
|
||||
@@ -288,9 +287,6 @@ pub struct Config {
|
||||
// This JWT token will be used to authenticate this service to the control plane.
|
||||
pub control_plane_jwt_token: Option<String>,
|
||||
|
||||
// This JWT token will be used to authenticate with other storage controller instances
|
||||
pub peer_jwt_token: Option<String>,
|
||||
|
||||
/// Where the compute hook should send notifications of pageserver attachment locations
|
||||
/// (this URL points to the control plane in prod). If this is None, the compute hook will
|
||||
/// assume it is running in a test environment and try to update neon_local.
|
||||
@@ -337,7 +333,7 @@ impl From<DatabaseError> for ApiError {
|
||||
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
|
||||
ApiError::ShuttingDown
|
||||
}
|
||||
DatabaseError::Logical(reason) | DatabaseError::Migration(reason) => {
|
||||
DatabaseError::Logical(reason) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
||||
}
|
||||
}
|
||||
@@ -610,15 +606,22 @@ impl Service {
|
||||
|
||||
// Before making any obeservable changes to the cluster, persist self
|
||||
// as leader in database and memory.
|
||||
let leadership = Leadership::new(
|
||||
self.persistence.clone(),
|
||||
self.config.clone(),
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
if let Some(address_for_peers) = &self.config.address_for_peers {
|
||||
// TODO: `address-for-peers` can become a mandatory cli arg
|
||||
// after we update the k8s setup
|
||||
let proposed_leader = ControllerPersistence {
|
||||
address: address_for_peers.to_string(),
|
||||
started_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
if let Err(e) = leadership.become_leader(current_leader).await {
|
||||
tracing::error!("Failed to persist self as leader: {e}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
if let Err(err) = self
|
||||
.persistence
|
||||
.update_leader(current_leader, proposed_leader)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
self.inner.write().unwrap().become_leader();
|
||||
@@ -1156,16 +1159,6 @@ impl Service {
|
||||
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (abort_tx, abort_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let leadership_cancel = CancellationToken::new();
|
||||
let leadership = Leadership::new(persistence.clone(), config.clone(), leadership_cancel);
|
||||
let (leader, leader_step_down_state) = leadership.step_down_current_leader().await?;
|
||||
|
||||
// Apply the migrations **after** the current leader has stepped down
|
||||
// (or we've given up waiting for it), but **before** reading from the
|
||||
// database. The only exception is reading the current leader before
|
||||
// migrating.
|
||||
persistence.migration_run().await?;
|
||||
|
||||
tracing::info!("Loading nodes from database...");
|
||||
let nodes = persistence
|
||||
.list_nodes()
|
||||
@@ -1383,6 +1376,32 @@ impl Service {
|
||||
return;
|
||||
};
|
||||
|
||||
let leadership_status = this.inner.read().unwrap().get_leadership_status();
|
||||
let leader = match this.get_leader().await {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to query database for current leader: {err}. Aborting start-up ..."
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let leader_step_down_state = match leadership_status {
|
||||
LeadershipStatus::Candidate => {
|
||||
if let Some(ref leader) = leader {
|
||||
this.request_step_down(leader).await
|
||||
} else {
|
||||
tracing::info!(
|
||||
"No leader found to request step down from. Will build observed state."
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
LeadershipStatus::Leader => None,
|
||||
LeadershipStatus::SteppedDown => unreachable!(),
|
||||
};
|
||||
|
||||
this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx)
|
||||
.await;
|
||||
|
||||
@@ -6358,4 +6377,42 @@ impl Service {
|
||||
|
||||
global_observed
|
||||
}
|
||||
|
||||
/// Request step down from the currently registered leader in the database
|
||||
///
|
||||
/// If such an entry is persisted, the success path returns the observed
|
||||
/// state and details of the leader. Otherwise, None is returned indicating
|
||||
/// there is no leader currently.
|
||||
///
|
||||
/// On failures to query the database or step down error responses the process is killed
|
||||
/// and we rely on k8s to retry.
|
||||
async fn request_step_down(
|
||||
&self,
|
||||
leader: &ControllerPersistence,
|
||||
) -> Option<GlobalObservedState> {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
// TODO: jwt token
|
||||
let client = PeerClient::new(
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.jwt_token.clone(),
|
||||
);
|
||||
let state = client.step_down(&self.cancel).await;
|
||||
match state {
|
||||
Ok(state) => Some(state),
|
||||
Err(err) => {
|
||||
// TODO: Make leaders periodically update a timestamp field in the
|
||||
// database and, if the leader is not reachable from the current instance,
|
||||
// but inferred as alive from the timestamp, abort start-up. This avoids
|
||||
// a potential scenario in which we have two controllers acting as leaders.
|
||||
tracing::error!(
|
||||
"Leader ({}) did not respond to step-down request: {}",
|
||||
leader.address,
|
||||
err
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
import toml
|
||||
from _pytest.python import Metafunc
|
||||
@@ -91,3 +92,23 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
and (platform := os.getenv("PLATFORM")) is not None
|
||||
):
|
||||
metafunc.parametrize("platform", [platform.lower()])
|
||||
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
|
||||
def pytest_runtest_makereport(*args, **kwargs):
|
||||
# Add test parameters to Allue report to distinguish the same tests with different parameters.
|
||||
# Names has `__` prefix to avoid conflicts with `pytest.mark.parametrize` parameters
|
||||
|
||||
# A mapping between `uname -m` and `RUNNER_ARCH` values.
|
||||
# `RUNNER_ARCH` environment variable is set on GitHub Runners,
|
||||
# possible values are X86, X64, ARM, or ARM64.
|
||||
# See https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
|
||||
uname_m = {
|
||||
"aarch64": "ARM64",
|
||||
"arm64": "ARM64",
|
||||
"x86_64": "X64",
|
||||
}.get(os.uname().machine, "UNKNOWN")
|
||||
arch = os.getenv("RUNNER_ARCH", uname_m)
|
||||
allure.dynamic.parameter("__arch", arch)
|
||||
|
||||
yield
|
||||
|
||||
@@ -2144,8 +2144,6 @@ def test_storage_controller_leadership_transfer(
|
||||
port_distributor: PortDistributor,
|
||||
step_down_times_out: bool,
|
||||
):
|
||||
neon_env_builder.auth_enabled = True
|
||||
|
||||
neon_env_builder.num_pageservers = 3
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import threading
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
@@ -22,13 +21,9 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
n_restarts = 100
|
||||
|
||||
def check_that_changes_propagated():
|
||||
scur.execute("SELECT received_lsn from pg_stat_subscription")
|
||||
received_lsn = scur.fetchall()[0][0]
|
||||
log.info(f"received_lsn={received_lsn}")
|
||||
scur.execute("SELECT count(*) FROM t")
|
||||
count = scur.fetchall()[0][0]
|
||||
log.info(f"count={count}")
|
||||
assert count == n_records
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == n_records
|
||||
|
||||
def insert_data(pub):
|
||||
with pub.cursor() as pcur:
|
||||
@@ -60,8 +55,5 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
thread.join()
|
||||
pcur.execute(f"INSERT into t values ({n_records}, 0)")
|
||||
n_records += 1
|
||||
pcur.execute("SELECT pg_current_wal_flush_lsn()")
|
||||
flush_lsn = pcur.fetchall()[0][0]
|
||||
log.info(f"flush_lsn={flush_lsn}")
|
||||
with sub.cursor() as scur:
|
||||
wait_until(60, 0.5, check_that_changes_propagated)
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 796e0cfa11...3fd7a45f8a
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 5c4076c873...46b4b235f3
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 60905ba5d5...47a9122a5a
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,14 +1,14 @@
|
||||
{
|
||||
"v16": [
|
||||
"16.3",
|
||||
"60905ba5d5c3f507c0a12b71cb3e95c818ef5fda"
|
||||
"47a9122a5a150a3217fafd3f3d4fe8e020ea718a"
|
||||
],
|
||||
"v15": [
|
||||
"15.7",
|
||||
"5c4076c873026f2a3314f1432b6b3d0cf8e57544"
|
||||
"46b4b235f38413ab5974bb22c022f9b829257674"
|
||||
],
|
||||
"v14": [
|
||||
"14.12",
|
||||
"796e0cfa111617f5186aa1ee29a1c974317dbeea"
|
||||
"3fd7a45f8aae85c080df6329e3c85887b7f3a737"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user