Compare commits

..

33 Commits

Author SHA1 Message Date
Andrey Taranik
b66d4e0bdc Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-20 15:45:42 +03:00
Andrey Taranik
6d9fd2bbb5 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-20 00:27:01 +03:00
Andrey Taranik
e5d9c003f5 try new qemu based runners 2024-08-19 17:10:17 +03:00
Andrey Taranik
290ce3ed46 try aws arm64 8 core 2024-08-18 02:45:55 +03:00
Andrey Taranik
1138e286b9 try aws arm64 16core runners 2024-08-17 23:04:50 +03:00
Andrey Taranik
9173847f81 try 80 core metal again 2024-08-17 20:20:07 +03:00
Andrey Taranik
6ae62574d0 try 16 core hcloud again 2024-08-17 19:34:31 +03:00
Andrey Taranik
5e074d8536 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-17 17:49:47 +03:00
Andrey Taranik
3a22daf33e runner lables fix 2024-08-17 16:37:17 +03:00
Andrey Taranik
f40f627730 tey aws arm64 runners 2024-08-17 16:30:24 +03:00
Andrey Taranik
56b94b7d1b return to large-arm64 2024-08-16 14:18:01 +03:00
Andrey Taranik
fe445b2945 more parallelism 2024-08-16 13:21:19 +03:00
Andrey Taranik
7f49f45a53 tune parallelism 2024-08-16 13:08:36 +03:00
Andrey Taranik
980b212789 try more parallelism 2024-08-16 11:56:11 +03:00
Andrey Taranik
2b17a03911 arm64 80 cores debian 12 2024-08-16 10:20:23 +03:00
Andrey Taranik
b50a9d84d4 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-16 09:06:09 +03:00
Andrey Taranik
6fd3c9daa5 trigger build 2024-08-15 20:27:24 +03:00
Andrey Taranik
409171ab08 try 16 cores 2024-08-15 16:46:57 +03:00
Andrey Taranik
f26240c9dc Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 16:46:00 +03:00
Andrey Taranik
1c771267ab Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 16:15:17 +03:00
Andrey Taranik
9f1b7b72ed Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 14:56:37 +03:00
Andrey Taranik
2476f7ef74 try arm64 with 80 cores 2024-08-15 14:56:14 +03:00
Andrey Taranik
f555cb3970 try cloud arm64 servers 2024-08-15 03:36:00 +03:00
Andrey Taranik
10a726503a Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 03:34:07 +03:00
Andrey Taranik
7ba86e15fa debug arm64 builds 2024-08-14 18:57:28 +03:00
Andrey Taranik
7ba42bfdb4 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-14 18:33:19 +03:00
Andrey Taranik
4f8a39d6c6 try metal arm64 runners 2024-08-14 17:46:13 +03:00
Alexander Bayandin
54c5da3981 CI(build-and-test): set RUSTFLAGS for ARM 2024-08-14 13:57:20 +01:00
Alexander Bayandin
c1378dc43b CI: don't collect code coverage on arm64 runners 2024-08-14 13:53:16 +01:00
Alexander Bayandin
50b9fb430a test_runner: add __arch parameter to Allure report 2024-08-14 13:53:16 +01:00
Alexander Bayandin
486eaba028 CI(build-and-test): run regression tests on arm 2024-08-14 13:53:16 +01:00
Alexander Bayandin
d4d70cc314 CI(build-and-test): make pg-versions configurable 2024-08-14 13:53:16 +01:00
Alexander Bayandin
176eefa47a CI(regress-tests): run debug builds only with the latest Postgres version 2024-08-14 13:53:16 +01:00
30 changed files with 208 additions and 348 deletions

View File

@@ -6,6 +6,10 @@ self-hosted-runner:
- small
- small-arm64
- us-east-2
- aws-arm64-8core
- aws-arm64-16core
- aws-arm64-32core
- qemu-arm64
config-variables:
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB

View File

@@ -131,7 +131,7 @@ runs:
fi
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
# -n sets the number of parallel processes that pytest-xdist will run
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
EXTRA_PARAMS="-n auto $EXTRA_PARAMS"
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
# to the same worker to make @pytest.mark.order work with xdist
@@ -169,10 +169,8 @@ runs:
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
cov_prefix=()
else
cov_prefix=()
fi

View File

@@ -36,7 +36,7 @@ env:
jobs:
build-neon:
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:
@@ -94,11 +94,16 @@ jobs:
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
# corresponding Cargo.toml files for their descriptions.
- name: Set env variables
env:
ARCH: ${{ inputs.arch }}
run: |
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" ]]; then
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked --release"
@@ -110,6 +115,11 @@ jobs:
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
} >> $GITHUB_ENV
# See https://github.com/aws/aws-graviton-getting-started/blob/57dc813626d0266f1cc12ef83474745bb1f31fb4/rust.md
- name: Set RUSTFLAGS for ARM
if: inputs.arch == 'arm64'
run: echo "RUSTFLAGS=-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1" >> $GITHUB_ENV
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@v4
@@ -153,11 +163,13 @@ jobs:
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc)
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
- name: Install rust binaries
env:
ARCH: ${{ inputs.arch }}
run: |
# Install target binaries
mkdir -p /tmp/neon/bin/
@@ -172,7 +184,7 @@ jobs:
done
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
@@ -243,10 +255,10 @@ jobs:
uses: ./.github/actions/save-coverage-data
regress-tests:
# Run test on x64 only
if: inputs.arch == 'x64'
# Don't run regression tests on debug arm64 builds
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
needs: [ build-neon ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:

View File

@@ -198,7 +198,7 @@ jobs:
strategy:
fail-fast: false
matrix:
arch: [ x64 ]
arch: [ x64, arm64 ]
# Do not build or run tests in debug for release branches
build-type: ${{ fromJson((startsWith(github.ref_name, 'release') && github.event_name == 'push') && '["release"]' || '["debug", "release"]') }}
include:

View File

@@ -217,7 +217,7 @@ impl StorageController {
Ok(exitcode.success())
}
/// Create our database if it doesn't exist
/// Create our database if it doesn't exist, and run migrations.
///
/// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
/// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
@@ -382,6 +382,7 @@ impl StorageController {
)
.await?;
// Run migrations on every startup, in case something changed.
self.setup_database(postgres_port).await?;
}
@@ -453,11 +454,6 @@ impl StorageController {
let jwt_token =
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
args.push(format!("--jwt-token={jwt_token}"));
let peer_claims = Claims::new(None, Scope::Admin);
let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
.expect("failed to generate jwt token");
args.push(format!("--peer-jwt-token={peer_jwt_token}"));
}
if let Some(public_key) = &self.public_key {

View File

@@ -413,10 +413,6 @@ where
let mut content = Vec::with_capacity(n_origins * 16 + 8);
content.extend_from_slice(&pg_constants::REPLICATION_STATE_MAGIC.to_le_bytes());
for (origin_id, origin_lsn) in repl_origins {
info!(
"Include origin_id={}, origin_lsn={} in basebackup",
origin_id, origin_lsn
);
content.extend_from_slice(&origin_id.to_le_bytes());
content.extend_from_slice(&[0u8; 6]); // align to 8 bytes
content.extend_from_slice(&origin_lsn.0.to_le_bytes());

View File

@@ -1803,15 +1803,6 @@ pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) static SECONDARY_HEATMAP_TOTAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_secondary_heatmap_total_size",
"The total size in bytes of all layers in the most recently downloaded heatmap.",
&["tenant_id", "shard_id"]
)
.expect("failed to define a metric")
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,

View File

@@ -8,7 +8,6 @@ use std::{sync::Arc, time::SystemTime};
use crate::{
context::RequestContext,
disk_usage_eviction_task::DiskUsageEvictionInfo,
metrics::SECONDARY_HEATMAP_TOTAL_SIZE,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
};
@@ -106,9 +105,6 @@ pub(crate) struct SecondaryTenant {
// Sum of layer sizes on local disk
pub(super) resident_size_metric: UIntGauge,
// Sum of layer sizes in the most recently downloaded heatmap
pub(super) heatmap_total_size_metric: UIntGauge,
}
impl Drop for SecondaryTenant {
@@ -116,7 +112,6 @@ impl Drop for SecondaryTenant {
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
}
}
@@ -133,10 +128,6 @@ impl SecondaryTenant {
.get_metric_with_label_values(&[&tenant_id, &shard_id])
.unwrap();
let heatmap_total_size_metric = SECONDARY_HEATMAP_TOTAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id])
.unwrap();
Arc::new(Self {
tenant_shard_id,
// todo: shall we make this a descendent of the
@@ -154,7 +145,6 @@ impl SecondaryTenant {
progress: std::sync::Mutex::default(),
resident_size_metric,
heatmap_total_size_metric,
})
}

View File

@@ -829,12 +829,6 @@ impl<'a> TenantDownloader<'a> {
layers_downloaded: 0,
bytes_downloaded: 0,
};
// Also expose heatmap bytes_total as a metric
self.secondary_state
.heatmap_total_size_metric
.set(heatmap_stats.bytes);
// Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
let mut delete_layers = Vec::new();
let mut delete_timelines = Vec::new();

View File

@@ -756,23 +756,11 @@ impl VirtualFile {
})
}
/// The function aborts the process if the error is fatal.
async fn write_at<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (FullSlice<B>, Result<usize, Error>) {
let (slice, result) = self.write_at_inner(buf, offset, _ctx).await;
let result = result.maybe_fatal_err("write_at");
(slice, result)
}
async fn write_at_inner<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (FullSlice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,

View File

@@ -235,7 +235,6 @@ impl WalIngest {
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
decoded.origin_id,
lsn,
ctx,
)
.await?;
@@ -249,7 +248,6 @@ impl WalIngest {
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
decoded.origin_id,
lsn,
ctx,
)
.await?;
@@ -410,16 +408,11 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_REPLORIGIN_SET {
let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
info!(
"Set replication origin_id={}, origin_lsn={}",
xlrec.node_id, xlrec.remote_lsn
);
modification
.set_replorigin(xlrec.node_id, xlrec.remote_lsn)
.await?
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
info!("Drop replication origin_id={}", xlrec.node_id);
modification.drop_replorigin(xlrec.node_id).await?
}
}
@@ -1227,7 +1220,6 @@ impl WalIngest {
parsed: &XlXactParsedRecord,
is_commit: bool,
origin_id: u16,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Record update of CLOG pages
@@ -1294,10 +1286,6 @@ impl WalIngest {
}
}
if origin_id != 0 {
info!(
"Commit at {} origin_id={}, origin_lsn={}",
lsn, origin_id, parsed.origin_lsn
);
modification
.set_replorigin(origin_id, parsed.origin_lsn)
.await?;

View File

@@ -173,6 +173,9 @@ struct ProxyCliArgs {
/// cache for `role_secret` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
role_secret_cache: String,
/// disable ip check for http requests. If it is too time consuming, it could be turned off.
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
disable_ip_check_for_http: bool,
/// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections)
#[clap(long)]
redis_notifications: Option<String>,
@@ -658,7 +661,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
)?;
let http_config = HttpConfig {
accept_websockets: true,
pool_options: GlobalConnPoolOptions {
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,

View File

@@ -52,7 +52,6 @@ pub struct TlsConfig {
}
pub struct HttpConfig {
pub accept_websockets: bool,
pub pool_options: GlobalConnPoolOptions,
pub cancel_set: CancelSet,
pub client_conn_threshold: u64,

View File

@@ -10,7 +10,6 @@ mod json;
mod sql_over_http;
mod websocket;
use async_trait::async_trait;
use atomic_take::AtomicTake;
use bytes::Bytes;
pub use conn_pool::GlobalConnPoolOptions;
@@ -27,9 +26,8 @@ use rand::rngs::StdRng;
use rand::SeedableRng;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::timeout;
use tokio_rustls::TlsAcceptor;
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use tokio_util::task::TaskTracker;
use crate::cancellation::CancellationHandlerMain;
@@ -43,7 +41,7 @@ use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
use std::net::{IpAddr, SocketAddr};
use std::pin::{pin, Pin};
use std::pin::pin;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
@@ -88,18 +86,18 @@ pub async fn task_main(
config,
endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
});
let tls_acceptor: Arc<dyn MaybeTlsAcceptor> = match config.tls_config.as_ref() {
Some(config) => {
let mut tls_server_config = rustls::ServerConfig::clone(&config.to_server_config());
// prefer http2, but support http/1.1
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Arc::new(tls_server_config) as Arc<_>
}
let tls_config = match config.tls_config.as_ref() {
Some(config) => config,
None => {
warn!("TLS config is missing");
Arc::new(NoTls) as Arc<_>
warn!("TLS config is missing, WebSocket Secure server will not be started");
return Ok(());
}
};
let mut tls_server_config = rustls::ServerConfig::clone(&tls_config.to_server_config());
// prefer http2, but support http/1.1
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let tls_acceptor: tokio_rustls::TlsAcceptor = Arc::new(tls_server_config).into();
let connections = tokio_util::task::task_tracker::TaskTracker::new();
connections.close(); // allows `connections.wait to complete`
@@ -178,41 +176,16 @@ pub async fn task_main(
Ok(())
}
pub trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + 'static {}
impl<T: AsyncRead + AsyncWrite + Send + 'static> AsyncReadWrite for T {}
pub type AsyncRW = Pin<Box<dyn AsyncReadWrite>>;
#[async_trait]
trait MaybeTlsAcceptor: Send + Sync + 'static {
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW>;
}
#[async_trait]
impl MaybeTlsAcceptor for rustls::ServerConfig {
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW> {
Ok(Box::pin(TlsAcceptor::from(self).accept(conn).await?))
}
}
struct NoTls;
#[async_trait]
impl MaybeTlsAcceptor for NoTls {
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW> {
Ok(Box::pin(conn))
}
}
/// Handles the TCP startup lifecycle.
/// 1. Parses PROXY protocol V2
/// 2. Handles TLS handshake
async fn connection_startup(
config: &ProxyConfig,
tls_acceptor: Arc<dyn MaybeTlsAcceptor>,
tls_acceptor: TlsAcceptor,
session_id: uuid::Uuid,
conn: TcpStream,
peer_addr: SocketAddr,
) -> Option<(AsyncRW, IpAddr)> {
) -> Option<(TlsStream<ChainRW<TcpStream>>, IpAddr)> {
// handle PROXY protocol
let (conn, peer) = match read_proxy_protocol(conn).await {
Ok(c) => c,
@@ -268,7 +241,7 @@ async fn connection_handler(
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_token: CancellationToken,
conn: AsyncRW,
conn: TlsStream<ChainRW<TcpStream>>,
peer_addr: IpAddr,
session_id: uuid::Uuid,
) {
@@ -353,9 +326,7 @@ async fn request_handler(
.map(|s| s.to_string());
// Check if the request is a websocket upgrade request.
if config.http_config.accept_websockets
&& framed_websockets::upgrade::is_upgrade_request(&request)
{
if framed_websockets::upgrade::is_upgrade_request(&request) {
let ctx = RequestMonitoring::new(
session_id,
peer_addr,

View File

@@ -758,7 +758,6 @@ mod tests {
async fn test_pool() {
let _ = env_logger::try_init();
let config = Box::leak(Box::new(crate::config::HttpConfig {
accept_websockets: false,
pool_options: GlobalConnPoolOptions {
max_conns_per_endpoint: 2,
gc_epoch: Duration::from_secs(1),

View File

@@ -147,7 +147,7 @@ impl UserFacingError for ConnInfoError {
fn get_conn_info(
ctx: &RequestMonitoring,
headers: &HeaderMap,
tls: Option<&TlsConfig>,
tls: &TlsConfig,
) -> Result<ConnInfo, ConnInfoError> {
// HTTP only uses cleartext (for now and likely always)
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
@@ -184,22 +184,12 @@ fn get_conn_info(
.ok_or(ConnInfoError::MissingPassword)?;
let password = urlencoding::decode_binary(password.as_bytes());
let endpoint = match connection_url.host() {
Some(url::Host::Domain(hostname)) => {
if let Some(tls) = tls {
endpoint_sni(hostname, &tls.common_names)?
.ok_or(ConnInfoError::MalformedEndpoint)?
} else {
hostname
.split_once(".")
.map_or(hostname, |(prefix, _)| prefix)
.into()
}
}
Some(url::Host::Ipv4(_)) | Some(url::Host::Ipv6(_)) | None => {
return Err(ConnInfoError::MissingHostname)
}
};
let hostname = connection_url
.host_str()
.ok_or(ConnInfoError::MissingHostname)?;
let endpoint =
endpoint_sni(hostname, &tls.common_names)?.ok_or(ConnInfoError::MalformedEndpoint)?;
ctx.set_endpoint_id(endpoint.clone());
let pairs = connection_url.query_pairs();
@@ -512,7 +502,7 @@ async fn handle_inner(
let headers = request.headers();
// TLS config should be there.
let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref())?;
let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref().unwrap())?;
info!(user = conn_info.user_info.user.as_str(), "credentials");
// Allow connection pooling only if explicitly requested

View File

@@ -18,6 +18,7 @@ import psycopg2
from psycopg2.extras import execute_values
CREATE_TABLE = """
CREATE TYPE arch AS ENUM ('ARM64', 'X64', 'UNKNOWN');
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
parent_suite TEXT NOT NULL,
@@ -28,6 +29,7 @@ CREATE TABLE IF NOT EXISTS results (
stopped_at TIMESTAMPTZ NOT NULL,
duration INT NOT NULL,
flaky BOOLEAN NOT NULL,
arch arch DEFAULT 'X64',
build_type TEXT NOT NULL,
pg_version INT NOT NULL,
run_id BIGINT NOT NULL,
@@ -35,7 +37,7 @@ CREATE TABLE IF NOT EXISTS results (
reference TEXT NOT NULL,
revision CHAR(40) NOT NULL,
raw JSONB COMPRESSION lz4 NOT NULL,
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
UNIQUE (parent_suite, suite, name, arch, build_type, pg_version, started_at, stopped_at, run_id)
);
"""
@@ -50,6 +52,7 @@ class Row:
stopped_at: datetime
duration: int
flaky: bool
arch: str
build_type: str
pg_version: int
run_id: int
@@ -121,6 +124,14 @@ def ingest_test_result(
raw.pop("labels")
raw.pop("extra")
# All allure parameters are prefixed with "__", see test_runner/fixtures/parametrize.py
parameters = {
p["name"].removeprefix("__"): p["value"]
for p in test["parameters"]
if p["name"].startswith("__")
}
arch = parameters.get("arch", "UNKNOWN").strip("'")
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
labels = {label["name"]: label["value"] for label in test["labels"]}
row = Row(
@@ -132,6 +143,7 @@ def ingest_test_result(
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
duration=test["time"]["duration"],
flaky=test["flaky"] or test["retriesStatusChange"],
arch=arch,
build_type=build_type,
pg_version=pg_version,
run_id=run_id,

View File

@@ -1,135 +0,0 @@
use std::sync::Arc;
use hyper::Uri;
use tokio_util::sync::CancellationToken;
use crate::{
peer_client::{GlobalObservedState, PeerClient},
persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence},
service::Config,
};
/// Helper for storage controller leadership acquisition
pub(crate) struct Leadership {
persistence: Arc<Persistence>,
config: Config,
cancel: CancellationToken,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error(transparent)]
Database(#[from] DatabaseError),
}
pub(crate) type Result<T> = std::result::Result<T, Error>;
impl Leadership {
pub(crate) fn new(
persistence: Arc<Persistence>,
config: Config,
cancel: CancellationToken,
) -> Self {
Self {
persistence,
config,
cancel,
}
}
/// Find the current leader in the database and request it to step down if required.
/// Should be called early on in within the start-up sequence.
///
/// Returns a tuple of two optionals: the current leader and its observed state
pub(crate) async fn step_down_current_leader(
&self,
) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
let leader = self.current_leader().await?;
let leader_step_down_state = if let Some(ref leader) = leader {
if self.config.start_as_candidate {
self.request_step_down(leader).await
} else {
None
}
} else {
tracing::info!("No leader found to request step down from. Will build observed state.");
None
};
Ok((leader, leader_step_down_state))
}
/// Mark the current storage controller instance as the leader in the database
pub(crate) async fn become_leader(
&self,
current_leader: Option<ControllerPersistence>,
) -> Result<()> {
if let Some(address_for_peers) = &self.config.address_for_peers {
// TODO: `address-for-peers` can become a mandatory cli arg
// after we update the k8s setup
let proposed_leader = ControllerPersistence {
address: address_for_peers.to_string(),
started_at: chrono::Utc::now(),
};
self.persistence
.update_leader(current_leader, proposed_leader)
.await
.map_err(Error::Database)
} else {
tracing::info!("No address-for-peers provided. Skipping leader persistence.");
Ok(())
}
}
async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
let res = self.persistence.get_leader().await;
if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
{
const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
if err.message().trim() == REL_NOT_FOUND_MSG {
// Special case: if this is a brand new storage controller, migrations will not
// have run at this point yet, and, hence, the controllers table does not exist.
// Detect this case via the error string (diesel doesn't type it) and allow it.
tracing::info!("Detected first storage controller start-up. Allowing missing controllers table ...");
return Ok(None);
}
}
res
}
/// Request step down from the currently registered leader in the database
///
/// If such an entry is persisted, the success path returns the observed
/// state and details of the leader. Otherwise, None is returned indicating
/// there is no leader currently.
async fn request_step_down(
&self,
leader: &ControllerPersistence,
) -> Option<GlobalObservedState> {
tracing::info!("Sending step down request to {leader:?}");
let client = PeerClient::new(
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
self.config.peer_jwt_token.clone(),
);
let state = client.step_down(&self.cancel).await;
match state {
Ok(state) => Some(state),
Err(err) => {
// TODO: Make leaders periodically update a timestamp field in the
// database and, if the leader is not reachable from the current instance,
// but inferred as alive from the timestamp, abort start-up. This avoids
// a potential scenario in which we have two controllers acting as leaders.
tracing::error!(
"Leader ({}) did not respond to step-down request: {}",
leader.address,
err
);
None
}
}
}
}

View File

@@ -8,7 +8,6 @@ mod drain_utils;
mod heartbeater;
pub mod http;
mod id_lock_map;
mod leadership;
pub mod metrics;
mod node;
mod pageserver_client;

View File

@@ -1,5 +1,6 @@
use anyhow::{anyhow, Context};
use clap::Parser;
use diesel::Connection;
use hyper::Uri;
use metrics::launch_timestamp::LaunchTimestamp;
use metrics::BuildInfo;
@@ -26,6 +27,9 @@ use utils::{project_build_tag, project_git_version, tcp_listener};
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(arg_required_else_help(true))]
@@ -47,9 +51,6 @@ struct Cli {
#[arg(long)]
control_plane_jwt_token: Option<String>,
#[arg(long)]
peer_jwt_token: Option<String>,
/// URL to control plane compute notification endpoint
#[arg(long)]
compute_hook_url: Option<String>,
@@ -129,28 +130,28 @@ struct Secrets {
public_key: Option<JwtAuth>,
jwt_token: Option<String>,
control_plane_jwt_token: Option<String>,
peer_jwt_token: Option<String>,
}
impl Secrets {
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
const PEER_JWT_TOKEN_ENV: &'static str = "PEER_JWT_TOKEN";
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
/// Load secrets from, in order of preference:
/// - CLI args if database URL is provided on the CLI
/// - Environment variables if DATABASE_URL is set.
/// - AWS Secrets Manager secrets
async fn load(args: &Cli) -> anyhow::Result<Self> {
let Some(database_url) = Self::load_secret(&args.database_url, Self::DATABASE_URL_ENV)
let Some(database_url) =
Self::load_secret(&args.database_url, Self::DATABASE_URL_ENV).await
else {
anyhow::bail!(
"Database URL is not set (set `--database-url`, or `DATABASE_URL` environment)"
)
};
let public_key = match Self::load_secret(&args.public_key, Self::PUBLIC_KEY_ENV) {
let public_key = match Self::load_secret(&args.public_key, Self::PUBLIC_KEY_ENV).await {
Some(v) => Some(JwtAuth::from_key(v).context("Loading public key")?),
None => None,
};
@@ -158,18 +159,18 @@ impl Secrets {
let this = Self {
database_url,
public_key,
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV),
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV).await,
control_plane_jwt_token: Self::load_secret(
&args.control_plane_jwt_token,
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
),
peer_jwt_token: Self::load_secret(&args.peer_jwt_token, Self::PEER_JWT_TOKEN_ENV),
)
.await,
};
Ok(this)
}
fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
async fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
if let Some(v) = cli {
Some(v.clone())
} else if let Ok(v) = std::env::var(env_name) {
@@ -180,6 +181,20 @@ impl Secrets {
}
}
/// Execute the diesel migrations that are built into this binary
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
use diesel::PgConnection;
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
let mut conn = PgConnection::establish(database_url)?;
HarnessWithOutput::write_to_stdout(&mut conn)
.run_pending_migrations(MIGRATIONS)
.map(|_| ())
.map_err(|e| anyhow::anyhow!(e))?;
Ok(())
}
fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
@@ -269,7 +284,6 @@ async fn async_main() -> anyhow::Result<()> {
let config = Config {
jwt_token: secrets.jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,
peer_jwt_token: secrets.peer_jwt_token,
compute_hook_url: args.compute_hook_url,
max_offline_interval: args
.max_offline_interval
@@ -290,9 +304,13 @@ async fn async_main() -> anyhow::Result<()> {
http_service_port: args.listen.port() as i32,
};
// Validate that we can connect to the database
// After loading secrets & config, but before starting anything else, apply database migrations
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
migration_run(&secrets.database_url)
.await
.context("Running database migrations")?;
let persistence = Arc::new(Persistence::new(secrets.database_url));
let service = Service::spawn(config, persistence.clone()).await?;

View File

@@ -230,7 +230,6 @@ pub(crate) enum DatabaseErrorLabel {
Connection,
ConnectionPool,
Logical,
Migration,
}
impl DatabaseError {
@@ -240,7 +239,6 @@ impl DatabaseError {
Self::Connection(_) => DatabaseErrorLabel::Connection,
Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool,
Self::Logical(_) => DatabaseErrorLabel::Logical,
Self::Migration(_) => DatabaseErrorLabel::Migration,
}
}
}

View File

@@ -25,9 +25,6 @@ use crate::metrics::{
};
use crate::node::Node;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// ## What do we store?
///
/// The storage controller service does not store most of its state durably.
@@ -75,8 +72,6 @@ pub(crate) enum DatabaseError {
ConnectionPool(#[from] r2d2::Error),
#[error("Logical error: {0}")]
Logical(String),
#[error("Migration error: {0}")]
Migration(String),
}
#[derive(measured::FixedCardinalityLabel, Copy, Clone)]
@@ -172,19 +167,6 @@ impl Persistence {
}
}
/// Execute the diesel migrations that are built into this binary
pub(crate) async fn migration_run(&self) -> DatabaseResult<()> {
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
self.with_conn(move |conn| -> DatabaseResult<()> {
HarnessWithOutput::write_to_stdout(conn)
.run_pending_migrations(MIGRATIONS)
.map(|_| ())
.map_err(|e| DatabaseError::Migration(e.to_string()))
})
.await
}
/// Wraps `with_conn` in order to collect latency and error metrics
async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
where

View File

@@ -17,9 +17,8 @@ use crate::{
compute_hook::NotifyError,
drain_utils::{self, TenantShardDrain, TenantShardIterator},
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
leadership::Leadership,
metrics,
peer_client::GlobalObservedState,
peer_client::{GlobalObservedState, PeerClient},
persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
TenantFilter,
@@ -288,9 +287,6 @@ pub struct Config {
// This JWT token will be used to authenticate this service to the control plane.
pub control_plane_jwt_token: Option<String>,
// This JWT token will be used to authenticate with other storage controller instances
pub peer_jwt_token: Option<String>,
/// Where the compute hook should send notifications of pageserver attachment locations
/// (this URL points to the control plane in prod). If this is None, the compute hook will
/// assume it is running in a test environment and try to update neon_local.
@@ -337,7 +333,7 @@ impl From<DatabaseError> for ApiError {
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
ApiError::ShuttingDown
}
DatabaseError::Logical(reason) | DatabaseError::Migration(reason) => {
DatabaseError::Logical(reason) => {
ApiError::InternalServerError(anyhow::anyhow!(reason))
}
}
@@ -610,15 +606,22 @@ impl Service {
// Before making any obeservable changes to the cluster, persist self
// as leader in database and memory.
let leadership = Leadership::new(
self.persistence.clone(),
self.config.clone(),
self.cancel.child_token(),
);
if let Some(address_for_peers) = &self.config.address_for_peers {
// TODO: `address-for-peers` can become a mandatory cli arg
// after we update the k8s setup
let proposed_leader = ControllerPersistence {
address: address_for_peers.to_string(),
started_at: chrono::Utc::now(),
};
if let Err(e) = leadership.become_leader(current_leader).await {
tracing::error!("Failed to persist self as leader: {e}. Aborting start-up ...");
std::process::exit(1);
if let Err(err) = self
.persistence
.update_leader(current_leader, proposed_leader)
.await
{
tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ...");
std::process::exit(1);
}
}
self.inner.write().unwrap().become_leader();
@@ -1156,16 +1159,6 @@ impl Service {
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
let (abort_tx, abort_rx) = tokio::sync::mpsc::unbounded_channel();
let leadership_cancel = CancellationToken::new();
let leadership = Leadership::new(persistence.clone(), config.clone(), leadership_cancel);
let (leader, leader_step_down_state) = leadership.step_down_current_leader().await?;
// Apply the migrations **after** the current leader has stepped down
// (or we've given up waiting for it), but **before** reading from the
// database. The only exception is reading the current leader before
// migrating.
persistence.migration_run().await?;
tracing::info!("Loading nodes from database...");
let nodes = persistence
.list_nodes()
@@ -1383,6 +1376,32 @@ impl Service {
return;
};
let leadership_status = this.inner.read().unwrap().get_leadership_status();
let leader = match this.get_leader().await {
Ok(ok) => ok,
Err(err) => {
tracing::error!(
"Failed to query database for current leader: {err}. Aborting start-up ..."
);
std::process::exit(1);
}
};
let leader_step_down_state = match leadership_status {
LeadershipStatus::Candidate => {
if let Some(ref leader) = leader {
this.request_step_down(leader).await
} else {
tracing::info!(
"No leader found to request step down from. Will build observed state."
);
None
}
}
LeadershipStatus::Leader => None,
LeadershipStatus::SteppedDown => unreachable!(),
};
this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx)
.await;
@@ -6358,4 +6377,42 @@ impl Service {
global_observed
}
/// Request step down from the currently registered leader in the database
///
/// If such an entry is persisted, the success path returns the observed
/// state and details of the leader. Otherwise, None is returned indicating
/// there is no leader currently.
///
/// On failures to query the database or step down error responses the process is killed
/// and we rely on k8s to retry.
async fn request_step_down(
&self,
leader: &ControllerPersistence,
) -> Option<GlobalObservedState> {
tracing::info!("Sending step down request to {leader:?}");
// TODO: jwt token
let client = PeerClient::new(
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
self.config.jwt_token.clone(),
);
let state = client.step_down(&self.cancel).await;
match state {
Ok(state) => Some(state),
Err(err) => {
// TODO: Make leaders periodically update a timestamp field in the
// database and, if the leader is not reachable from the current instance,
// but inferred as alive from the timestamp, abort start-up. This avoids
// a potential scenario in which we have two controllers acting as leaders.
tracing::error!(
"Leader ({}) did not respond to step-down request: {}",
leader.address,
err
);
None
}
}
}
}

View File

@@ -1,6 +1,7 @@
import os
from typing import Any, Dict, Optional
import allure
import pytest
import toml
from _pytest.python import Metafunc
@@ -91,3 +92,23 @@ def pytest_generate_tests(metafunc: Metafunc):
and (platform := os.getenv("PLATFORM")) is not None
):
metafunc.parametrize("platform", [platform.lower()])
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(*args, **kwargs):
# Add test parameters to Allue report to distinguish the same tests with different parameters.
# Names has `__` prefix to avoid conflicts with `pytest.mark.parametrize` parameters
# A mapping between `uname -m` and `RUNNER_ARCH` values.
# `RUNNER_ARCH` environment variable is set on GitHub Runners,
# possible values are X86, X64, ARM, or ARM64.
# See https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
uname_m = {
"aarch64": "ARM64",
"arm64": "ARM64",
"x86_64": "X64",
}.get(os.uname().machine, "UNKNOWN")
arch = os.getenv("RUNNER_ARCH", uname_m)
allure.dynamic.parameter("__arch", arch)
yield

View File

@@ -2144,8 +2144,6 @@ def test_storage_controller_leadership_transfer(
port_distributor: PortDistributor,
step_down_times_out: bool,
):
neon_env_builder.auth_enabled = True
neon_env_builder.num_pageservers = 3
neon_env_builder.storage_controller_config = {

View File

@@ -1,7 +1,6 @@
import threading
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import wait_until
@@ -22,13 +21,9 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
n_restarts = 100
def check_that_changes_propagated():
scur.execute("SELECT received_lsn from pg_stat_subscription")
received_lsn = scur.fetchall()[0][0]
log.info(f"received_lsn={received_lsn}")
scur.execute("SELECT count(*) FROM t")
count = scur.fetchall()[0][0]
log.info(f"count={count}")
assert count == n_records
res = scur.fetchall()
assert res[0][0] == n_records
def insert_data(pub):
with pub.cursor() as pcur:
@@ -60,8 +55,5 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
thread.join()
pcur.execute(f"INSERT into t values ({n_records}, 0)")
n_records += 1
pcur.execute("SELECT pg_current_wal_flush_lsn()")
flush_lsn = pcur.fetchall()[0][0]
log.info(f"flush_lsn={flush_lsn}")
with sub.cursor() as scur:
wait_until(60, 0.5, check_that_changes_propagated)

View File

@@ -1,14 +1,14 @@
{
"v16": [
"16.3",
"60905ba5d5c3f507c0a12b71cb3e95c818ef5fda"
"47a9122a5a150a3217fafd3f3d4fe8e020ea718a"
],
"v15": [
"15.7",
"5c4076c873026f2a3314f1432b6b3d0cf8e57544"
"46b4b235f38413ab5974bb22c022f9b829257674"
],
"v14": [
"14.12",
"796e0cfa111617f5186aa1ee29a1c974317dbeea"
"3fd7a45f8aae85c080df6329e3c85887b7f3a737"
]
}