mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-04 11:10:37 +00:00
Compare commits
40 Commits
jcsp/no-co
...
cloneable/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f88c4b8f3 | ||
|
|
c08759f367 | ||
|
|
ba9722a2fd | ||
|
|
2d4f267983 | ||
|
|
7a598b9842 | ||
|
|
eefad27538 | ||
|
|
cd10c719f9 | ||
|
|
363ea97f69 | ||
|
|
56e6ebfe17 | ||
|
|
1622fd8bda | ||
|
|
8c7dcd2598 | ||
|
|
ee22d4c9ef | ||
|
|
26600f2973 | ||
|
|
b3cd883f93 | ||
|
|
38c7a2abfc | ||
|
|
f94248a594 | ||
|
|
9c53b41245 | ||
|
|
197a89ab3d | ||
|
|
b89e02f3e8 | ||
|
|
04517c6ff3 | ||
|
|
628451d68e | ||
|
|
502d512fe2 | ||
|
|
afda6d4700 | ||
|
|
65042cbadd | ||
|
|
b135194090 | ||
|
|
43dc03459d | ||
|
|
a1b0558493 | ||
|
|
cc138b56f9 | ||
|
|
61fcf64c22 | ||
|
|
6d3e8096fc | ||
|
|
3d1c3a80ae | ||
|
|
835287ba3a | ||
|
|
d63602cc78 | ||
|
|
1668d39b7c | ||
|
|
1d12efc428 | ||
|
|
85696297c5 | ||
|
|
aaf980f70d | ||
|
|
c52514ab02 | ||
|
|
2ee6bc5ec4 | ||
|
|
fd230227f2 |
@@ -169,7 +169,7 @@ runs:
|
||||
fi
|
||||
|
||||
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
|
||||
cov_prefix=()
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
else
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
@@ -90,7 +90,7 @@ jobs:
|
||||
run: |
|
||||
CARGO_FEATURES="--features testing"
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
cov_prefix=""
|
||||
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
|
||||
CARGO_FLAGS="--locked"
|
||||
elif [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=""
|
||||
|
||||
25
.github/workflows/benchmarking.yml
vendored
25
.github/workflows/benchmarking.yml
vendored
@@ -308,6 +308,7 @@ jobs:
|
||||
"image": [ "'"$image_default"'" ],
|
||||
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new-many-tables","db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb","runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
|
||||
@@ -410,7 +411,7 @@ jobs:
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
|
||||
- name: Create Neon Project
|
||||
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
|
||||
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-new-many-tables", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
|
||||
id: create-neon-project
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
@@ -429,7 +430,7 @@ jobs:
|
||||
neonvm-captest-sharding-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
|
||||
;;
|
||||
neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
|
||||
neonvm-captest-new | neonvm-captest-new-many-tables | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
|
||||
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
|
||||
;;
|
||||
rds-aurora)
|
||||
@@ -446,6 +447,26 @@ jobs:
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
# we want to compare Neon project OLTP throughput and latency at scale factor 10 GB
|
||||
# without (neonvm-captest-new)
|
||||
# and with (neonvm-captest-new-many-tables) many relations in the database
|
||||
- name: Create many relations before the run
|
||||
if: contains(fromJson('["neonvm-captest-new-many-tables"]'), matrix.platform)
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
|
||||
pg_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_NUM_RELATIONS: 10000
|
||||
|
||||
- name: Benchmark init
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -212,7 +212,7 @@ jobs:
|
||||
fi
|
||||
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
|
||||
- name: Run cargo clippy (debug)
|
||||
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
|
||||
run: cargo hack --features default --ignore-unknown-features --feature-powerset clippy $CLIPPY_COMMON_ARGS
|
||||
|
||||
- name: Check documentation generation
|
||||
run: cargo doc --workspace --no-deps --document-private-items
|
||||
|
||||
2
.github/workflows/cloud-regress.yml
vendored
2
.github/workflows/cloud-regress.yml
vendored
@@ -21,6 +21,8 @@ concurrency:
|
||||
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
regress:
|
||||
|
||||
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -1274,6 +1274,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"compute_api",
|
||||
"fail",
|
||||
"flate2",
|
||||
"futures",
|
||||
"hyper 0.14.30",
|
||||
@@ -1732,9 +1733,9 @@ checksum = "ab03c107fafeb3ee9f5925686dbb7a73bc76e3932abb0d2b365cb64b169cf04c"
|
||||
|
||||
[[package]]
|
||||
name = "diesel"
|
||||
version = "2.2.3"
|
||||
version = "2.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "65e13bab2796f412722112327f3e575601a3e9cdcbe426f0d30dbf43f3f5dc71"
|
||||
checksum = "ccf1bedf64cdb9643204a36dd15b19a6ce8e7aa7f7b105868e9f1fad5ffa7d12"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"byteorder",
|
||||
@@ -4493,9 +4494,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "pq-sys"
|
||||
version = "0.4.8"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
|
||||
checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
|
||||
dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
@@ -1285,7 +1285,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Compile and run the Neon-specific `compute_ctl` and `fast_import` binaries
|
||||
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
|
||||
#
|
||||
#########################################################################################
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
|
||||
@@ -1295,7 +1295,7 @@ ENV BUILD_TAG=$BUILD_TAG
|
||||
USER nonroot
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
RUN cd compute_tools && mold -run cargo build --locked --profile release-line-debug-size-lto
|
||||
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1338,20 +1338,6 @@ RUN set -e \
|
||||
&& make -j $(nproc) dist_man_MANS= \
|
||||
&& make install dist_man_MANS=
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Compile the Neon-specific `local_proxy` binary
|
||||
#
|
||||
#########################################################################################
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS local_proxy
|
||||
ARG BUILD_TAG
|
||||
ENV BUILD_TAG=$BUILD_TAG
|
||||
|
||||
USER nonroot
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin local_proxy
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layers "postgres-exporter" and "sql-exporter"
|
||||
@@ -1491,7 +1477,7 @@ COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/
|
||||
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini
|
||||
|
||||
# local_proxy and its config
|
||||
COPY --from=local_proxy --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
|
||||
RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy
|
||||
|
||||
# Metrics exporter binaries and configuration files
|
||||
@@ -1556,28 +1542,30 @@ RUN apt update && \
|
||||
locales \
|
||||
procps \
|
||||
ca-certificates \
|
||||
curl \
|
||||
unzip \
|
||||
$VERSION_INSTALLS && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2
|
||||
# used by fast_import
|
||||
# aws cli is used by fast_import (curl and unzip above are at this time only used for this installation step)
|
||||
ARG TARGETARCH
|
||||
ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb
|
||||
RUN set -ex; \
|
||||
\
|
||||
# Determine the expected checksum based on TARGETARCH
|
||||
if [ "${TARGETARCH}" = "amd64" ]; then \
|
||||
CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \
|
||||
TARGETARCH_ALT="x86_64"; \
|
||||
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
|
||||
elif [ "${TARGETARCH}" = "arm64" ]; then \
|
||||
CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \
|
||||
TARGETARCH_ALT="aarch64"; \
|
||||
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
|
||||
else \
|
||||
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
|
||||
fi; \
|
||||
\
|
||||
# Compute and validate the checksum
|
||||
echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c -
|
||||
RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb
|
||||
curl -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
|
||||
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
|
||||
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
|
||||
/tmp/awscliv2/aws/install; \
|
||||
rm -rf /tmp/awscliv2.zip /tmp/awscliv2; \
|
||||
true
|
||||
|
||||
ENV LANG=en_US.utf8
|
||||
USER postgres
|
||||
|
||||
@@ -7,7 +7,7 @@ license.workspace = true
|
||||
[features]
|
||||
default = []
|
||||
# Enables test specific features.
|
||||
testing = []
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
base64.workspace = true
|
||||
@@ -19,6 +19,7 @@ camino.workspace = true
|
||||
chrono.workspace = true
|
||||
cfg-if.workspace = true
|
||||
clap.workspace = true
|
||||
fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
hyper0 = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -67,12 +67,15 @@ use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
use compute_tools::swap::resize_swap;
|
||||
use rlimit::{setrlimit, Resource};
|
||||
use utils::failpoint_support;
|
||||
|
||||
// this is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
let (build_tag, clap_args) = init()?;
|
||||
|
||||
// enable core dumping for all child processes
|
||||
@@ -100,6 +103,8 @@ fn main() -> Result<()> {
|
||||
|
||||
maybe_delay_exit(delay_exit);
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
deinit_and_exit(wait_pg_result);
|
||||
}
|
||||
|
||||
@@ -419,9 +424,13 @@ fn start_postgres(
|
||||
"running compute with features: {:?}",
|
||||
state.pspec.as_ref().unwrap().spec.features
|
||||
);
|
||||
// before we release the mutex, fetch the swap size (if any) for later.
|
||||
let swap_size_bytes = state.pspec.as_ref().unwrap().spec.swap_size_bytes;
|
||||
let disk_quota_bytes = state.pspec.as_ref().unwrap().spec.disk_quota_bytes;
|
||||
// before we release the mutex, fetch some parameters for later.
|
||||
let &ComputeSpec {
|
||||
swap_size_bytes,
|
||||
disk_quota_bytes,
|
||||
disable_lfc_resizing,
|
||||
..
|
||||
} = &state.pspec.as_ref().unwrap().spec;
|
||||
drop(state);
|
||||
|
||||
// Launch remaining service threads
|
||||
@@ -526,11 +535,18 @@ fn start_postgres(
|
||||
// This token is used internally by the monitor to clean up all threads
|
||||
let token = CancellationToken::new();
|
||||
|
||||
// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
|
||||
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
|
||||
None
|
||||
} else {
|
||||
file_cache_connstr.cloned()
|
||||
};
|
||||
|
||||
let vm_monitor = rt.as_ref().map(|rt| {
|
||||
rt.spawn(vm_monitor::start(
|
||||
Box::leak(Box::new(vm_monitor::Args {
|
||||
cgroup: cgroup.cloned(),
|
||||
pgconnstr: file_cache_connstr.cloned(),
|
||||
pgconnstr,
|
||||
addr: vm_monitor_addr.clone(),
|
||||
})),
|
||||
token.clone(),
|
||||
|
||||
@@ -34,12 +34,12 @@ use nix::unistd::Pid;
|
||||
use tracing::{info, info_span, warn, Instrument};
|
||||
use utils::fs_ext::is_directory_empty;
|
||||
|
||||
#[path = "fast_import/aws_s3_sync.rs"]
|
||||
mod aws_s3_sync;
|
||||
#[path = "fast_import/child_stdio_to_log.rs"]
|
||||
mod child_stdio_to_log;
|
||||
#[path = "fast_import/s3_uri.rs"]
|
||||
mod s3_uri;
|
||||
#[path = "fast_import/s5cmd.rs"]
|
||||
mod s5cmd;
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
struct Args {
|
||||
@@ -326,7 +326,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
info!("upload pgdata");
|
||||
s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/"))
|
||||
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
|
||||
.await
|
||||
.context("sync dump directory to destination")?;
|
||||
|
||||
@@ -334,10 +334,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
{
|
||||
let status_dir = working_directory.join("status");
|
||||
std::fs::create_dir(&status_dir).context("create status directory")?;
|
||||
let status_file = status_dir.join("status");
|
||||
let status_file = status_dir.join("pgdata");
|
||||
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
|
||||
.context("write status file")?;
|
||||
s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata"))
|
||||
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
|
||||
.await
|
||||
.context("sync status directory to destination")?;
|
||||
}
|
||||
|
||||
@@ -4,24 +4,21 @@ use camino::Utf8Path;
|
||||
use super::s3_uri::S3Uri;
|
||||
|
||||
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
|
||||
let mut builder = tokio::process::Command::new("s5cmd");
|
||||
// s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL
|
||||
if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") {
|
||||
builder.arg("--endpoint-url").arg(val);
|
||||
}
|
||||
let mut builder = tokio::process::Command::new("aws");
|
||||
builder
|
||||
.arg("s3")
|
||||
.arg("sync")
|
||||
.arg(local.as_str())
|
||||
.arg(remote.to_string());
|
||||
let st = builder
|
||||
.spawn()
|
||||
.context("spawn s5cmd")?
|
||||
.context("spawn aws s3 sync")?
|
||||
.wait()
|
||||
.await
|
||||
.context("wait for s5cmd")?;
|
||||
.context("wait for aws s3 sync")?;
|
||||
if st.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!("s5cmd failed"))
|
||||
Err(anyhow::anyhow!("aws s3 sync failed"))
|
||||
}
|
||||
}
|
||||
@@ -1181,8 +1181,19 @@ impl ComputeNode {
|
||||
let mut conf = postgres::config::Config::from(conf);
|
||||
conf.application_name("compute_ctl:migrations");
|
||||
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
handle_migrations(&mut client).context("apply_config handle_migrations")
|
||||
match conf.connect(NoTls) {
|
||||
Ok(mut client) => {
|
||||
if let Err(e) = handle_migrations(&mut client) {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to connect to the compute for running migrations: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
|
||||
@@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
|
||||
use metrics::Encoder;
|
||||
use metrics::TextEncoder;
|
||||
use tokio::task;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::http::request::must_get_query_param;
|
||||
|
||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
||||
@@ -310,6 +313,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
|
||||
match failpoints_handler(req, CancellationToken::new()).await {
|
||||
Ok(r) => r,
|
||||
Err(ApiError::BadRequest(e)) => {
|
||||
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
|
||||
}
|
||||
Err(_) => {
|
||||
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from remote extension storage on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
use anyhow::{Context, Result};
|
||||
use fail::fail_point;
|
||||
use postgres::Client;
|
||||
use tracing::info;
|
||||
|
||||
/// Runs a series of migrations on a target database
|
||||
pub(crate) struct MigrationRunner<'m> {
|
||||
client: &'m mut Client,
|
||||
migrations: &'m [&'m str],
|
||||
}
|
||||
|
||||
impl<'m> MigrationRunner<'m> {
|
||||
/// Create a new migration runner
|
||||
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
|
||||
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
|
||||
assert!(migrations.len() + 1 < i64::MAX as usize);
|
||||
@@ -15,6 +18,7 @@ impl<'m> MigrationRunner<'m> {
|
||||
Self { client, migrations }
|
||||
}
|
||||
|
||||
/// Get the current value neon_migration.migration_id
|
||||
fn get_migration_id(&mut self) -> Result<i64> {
|
||||
let query = "SELECT id FROM neon_migration.migration_id";
|
||||
let row = self
|
||||
@@ -25,37 +29,61 @@ impl<'m> MigrationRunner<'m> {
|
||||
Ok(row.get::<&str, i64>("id"))
|
||||
}
|
||||
|
||||
/// Update the neon_migration.migration_id value
|
||||
///
|
||||
/// This function has a fail point called compute-migration, which can be
|
||||
/// used if you would like to fail the application of a series of migrations
|
||||
/// at some point.
|
||||
fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
|
||||
let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
|
||||
// We use this fail point in order to check that failing in the
|
||||
// middle of applying a series of migrations fails in an expected
|
||||
// manner
|
||||
if cfg!(feature = "testing") {
|
||||
let fail = (|| {
|
||||
fail_point!("compute-migration", |fail_migration_id| {
|
||||
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
|
||||
});
|
||||
|
||||
false
|
||||
})();
|
||||
|
||||
if fail {
|
||||
return Err(anyhow::anyhow!(format!(
|
||||
"migration {} was configured to fail because of a failpoint",
|
||||
migration_id
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
self.client
|
||||
.simple_query(&setval)
|
||||
.query(
|
||||
"UPDATE neon_migration.migration_id SET id = $1",
|
||||
&[&migration_id],
|
||||
)
|
||||
.context("run_migrations update id")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_migrations(&mut self) -> Result<()> {
|
||||
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
|
||||
self.client.simple_query(query)?;
|
||||
|
||||
let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
|
||||
self.client.simple_query(query)?;
|
||||
|
||||
let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
|
||||
self.client.simple_query(query)?;
|
||||
|
||||
let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
|
||||
self.client.simple_query(query)?;
|
||||
|
||||
let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
|
||||
self.client.simple_query(query)?;
|
||||
/// Prepare the migrations the target database for handling migrations
|
||||
fn prepare_database(&mut self) -> Result<()> {
|
||||
self.client
|
||||
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
|
||||
self.client.simple_query(
|
||||
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
|
||||
)?;
|
||||
self.client
|
||||
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
|
||||
self.client
|
||||
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the configrured set of migrations
|
||||
pub fn run_migrations(mut self) -> Result<()> {
|
||||
self.prepare_migrations()?;
|
||||
self.prepare_database()?;
|
||||
|
||||
let mut current_migration = self.get_migration_id()? as usize;
|
||||
while current_migration < self.migrations.len() {
|
||||
@@ -69,6 +97,11 @@ impl<'m> MigrationRunner<'m> {
|
||||
|
||||
if migration.starts_with("-- SKIP") {
|
||||
info!("Skipping migration id={}", migration_id!(current_migration));
|
||||
|
||||
// Even though we are skipping the migration, updating the
|
||||
// migration ID should help keep logic easy to understand when
|
||||
// trying to understand the state of a cluster.
|
||||
self.update_migration_id(migration_id!(current_migration))?;
|
||||
} else {
|
||||
info!(
|
||||
"Running migration id={}:\n{}\n",
|
||||
@@ -87,7 +120,6 @@ impl<'m> MigrationRunner<'m> {
|
||||
)
|
||||
})?;
|
||||
|
||||
// Migration IDs start at 1
|
||||
self.update_migration_id(migration_id!(current_migration))?;
|
||||
|
||||
self.client
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
bypassrls boolean;
|
||||
BEGIN
|
||||
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
|
||||
IF NOT bypassrls THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
|
||||
END IF;
|
||||
END $$;
|
||||
25
compute_tools/src/migrations/tests/0002-alter_roles.sql
Normal file
25
compute_tools/src/migrations/tests/0002-alter_roles.sql
Normal file
@@ -0,0 +1,25 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
role record;
|
||||
BEGIN
|
||||
FOR role IN
|
||||
SELECT rolname AS name, rolinherit AS inherit
|
||||
FROM pg_roles
|
||||
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
LOOP
|
||||
IF NOT role.inherit THEN
|
||||
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
FOR role IN
|
||||
SELECT rolname AS name, rolbypassrls AS bypassrls
|
||||
FROM pg_roles
|
||||
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
AND NOT starts_with(rolname, 'pg_')
|
||||
LOOP
|
||||
IF role.bypassrls THEN
|
||||
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
|
||||
END IF;
|
||||
END LOOP;
|
||||
END $$;
|
||||
@@ -0,0 +1,10 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -0,0 +1,19 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
monitor record;
|
||||
BEGIN
|
||||
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
|
||||
admin_option AS admin
|
||||
INTO monitor
|
||||
FROM pg_auth_members
|
||||
WHERE roleid = 'pg_monitor'::regrole
|
||||
AND member = 'pg_monitor'::regrole;
|
||||
|
||||
IF NOT monitor.member THEN
|
||||
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
|
||||
END IF;
|
||||
|
||||
IF NOT monitor.admin THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -0,0 +1,2 @@
|
||||
-- This test was never written becuase at the time migration tests were added
|
||||
-- the accompanying migration was already skipped.
|
||||
@@ -0,0 +1,2 @@
|
||||
-- This test was never written becuase at the time migration tests were added
|
||||
-- the accompanying migration was already skipped.
|
||||
@@ -0,0 +1,2 @@
|
||||
-- This test was never written becuase at the time migration tests were added
|
||||
-- the accompanying migration was already skipped.
|
||||
@@ -0,0 +1,2 @@
|
||||
-- This test was never written becuase at the time migration tests were added
|
||||
-- the accompanying migration was already skipped.
|
||||
@@ -0,0 +1,2 @@
|
||||
-- This test was never written becuase at the time migration tests were added
|
||||
-- the accompanying migration was already skipped.
|
||||
@@ -0,0 +1,13 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
can_execute boolean;
|
||||
BEGIN
|
||||
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
|
||||
INTO can_execute
|
||||
FROM pg_proc
|
||||
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
|
||||
AND pronamespace = 'pg_catalog'::regnamespace;
|
||||
IF NOT can_execute THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -0,0 +1,13 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
can_execute boolean;
|
||||
BEGIN
|
||||
SELECT has_function_privilege('neon_superuser', oid, 'execute')
|
||||
INTO can_execute
|
||||
FROM pg_proc
|
||||
WHERE proname = 'pg_show_replication_origin_status'
|
||||
AND pronamespace = 'pg_catalog'::regnamespace;
|
||||
IF NOT can_execute THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot execute pg_show_replication_origin_status';
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -19,6 +19,7 @@ use control_plane::storage_controller::{
|
||||
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
|
||||
};
|
||||
use control_plane::{broker, local_env};
|
||||
use nix::fcntl::{flock, FlockArg};
|
||||
use pageserver_api::config::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
@@ -36,6 +37,8 @@ use safekeeper_api::{
|
||||
};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::fs::File;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
@@ -689,6 +692,21 @@ struct TimelineTreeEl {
|
||||
pub children: BTreeSet<TimelineId>,
|
||||
}
|
||||
|
||||
/// A flock-based guard over the neon_local repository directory
|
||||
struct RepoLock {
|
||||
_file: File,
|
||||
}
|
||||
|
||||
impl RepoLock {
|
||||
fn new() -> Result<Self> {
|
||||
let repo_dir = File::open(local_env::base_path())?;
|
||||
let repo_dir_fd = repo_dir.as_raw_fd();
|
||||
flock(repo_dir_fd, FlockArg::LockExclusive)?;
|
||||
|
||||
Ok(Self { _file: repo_dir })
|
||||
}
|
||||
}
|
||||
|
||||
// Main entry point for the 'neon_local' CLI utility
|
||||
//
|
||||
// This utility helps to manage neon installation. That includes following:
|
||||
@@ -700,9 +718,14 @@ fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
// Check for 'neon init' command first.
|
||||
let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
|
||||
handle_init(&args).map(|env| Some(Cow::Owned(env)))
|
||||
let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
|
||||
(handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
|
||||
} else {
|
||||
// This tool uses a collection of simple files to store its state, and consequently
|
||||
// it is not generally safe to run multiple commands concurrently. Rather than expect
|
||||
// all callers to know this, use a lock file to protect against concurrent execution.
|
||||
let _repo_lock = RepoLock::new().unwrap();
|
||||
|
||||
// all other commands need an existing config
|
||||
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
|
||||
let original_env = env.clone();
|
||||
@@ -728,11 +751,12 @@ fn main() -> Result<()> {
|
||||
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
|
||||
};
|
||||
|
||||
if &original_env != env {
|
||||
let subcommand_result = if &original_env != env {
|
||||
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
|
||||
} else {
|
||||
subcommand_result.map(|()| None)
|
||||
}
|
||||
};
|
||||
(subcommand_result, Some(_repo_lock))
|
||||
};
|
||||
|
||||
match subcommand_result {
|
||||
@@ -922,7 +946,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
} else {
|
||||
// User (likely interactive) did not provide a description of the environment, give them the default
|
||||
NeonLocalInitConf {
|
||||
control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
|
||||
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
|
||||
broker: NeonBroker {
|
||||
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
|
||||
},
|
||||
@@ -1718,18 +1742,15 @@ async fn handle_start_all_impl(
|
||||
broker::start_broker_process(env, &retry_timeout).await
|
||||
});
|
||||
|
||||
// Only start the storage controller if the pageserver is configured to need it
|
||||
if env.control_plane_api.is_some() {
|
||||
js.spawn(async move {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.start(NeonStorageControllerStartArgs::with_default_instance_id(
|
||||
retry_timeout,
|
||||
))
|
||||
.await
|
||||
.map_err(|e| e.context("start storage_controller"))
|
||||
});
|
||||
}
|
||||
js.spawn(async move {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.start(NeonStorageControllerStartArgs::with_default_instance_id(
|
||||
retry_timeout,
|
||||
))
|
||||
.await
|
||||
.map_err(|e| e.context("start storage_controller"))
|
||||
});
|
||||
|
||||
for ps_conf in &env.pageservers {
|
||||
js.spawn(async move {
|
||||
@@ -1774,10 +1795,6 @@ async fn neon_start_status_check(
|
||||
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
|
||||
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
|
||||
|
||||
if env.control_plane_api.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let storcon = StorageController::from_env(env);
|
||||
|
||||
let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
|
||||
|
||||
@@ -316,6 +316,10 @@ impl Endpoint {
|
||||
// and can cause errors like 'no unpinned buffers available', see
|
||||
// <https://github.com/neondatabase/neon/issues/9956>
|
||||
conf.append("shared_buffers", "1MB");
|
||||
// Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
|
||||
// batching logic. Set this to 2 so that we exercise the code a bit without letting
|
||||
// individual tests do a lot of concurrent work on underpowered test machines
|
||||
conf.append("effective_io_concurrency", "2");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "logical");
|
||||
@@ -581,6 +585,7 @@ impl Endpoint {
|
||||
features: self.features.clone(),
|
||||
swap_size_bytes: None,
|
||||
disk_quota_bytes: None,
|
||||
disable_lfc_resizing: None,
|
||||
cluster: Cluster {
|
||||
cluster_id: None, // project ID: not used
|
||||
name: None, // project name: not used
|
||||
|
||||
@@ -76,7 +76,7 @@ pub struct LocalEnv {
|
||||
|
||||
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
|
||||
// be propagated into each pageserver's configuration.
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_api: Url,
|
||||
|
||||
// Control plane upcall API for storage controller. If set, this will be propagated into the
|
||||
// storage controller's configuration.
|
||||
@@ -133,7 +133,7 @@ pub struct NeonLocalInitConf {
|
||||
pub storage_controller: Option<NeonStorageControllerConf>,
|
||||
pub pageservers: Vec<NeonLocalInitPageserverConf>,
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub control_plane_api: Option<Option<Url>>,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Option<Url>>,
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ impl NeonStorageControllerConf {
|
||||
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
// Very tight heartbeat interval to speed up tests
|
||||
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
|
||||
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);
|
||||
}
|
||||
|
||||
impl Default for NeonStorageControllerConf {
|
||||
@@ -535,7 +535,7 @@ impl LocalEnv {
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
}
|
||||
@@ -638,7 +638,7 @@ impl LocalEnv {
|
||||
storage_controller: self.storage_controller.clone(),
|
||||
pageservers: vec![], // it's skip_serializing anyway
|
||||
safekeepers: self.safekeepers.clone(),
|
||||
control_plane_api: self.control_plane_api.clone(),
|
||||
control_plane_api: Some(self.control_plane_api.clone()),
|
||||
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
},
|
||||
@@ -768,7 +768,7 @@ impl LocalEnv {
|
||||
storage_controller: storage_controller.unwrap_or_default(),
|
||||
pageservers: pageservers.iter().map(Into::into).collect(),
|
||||
safekeepers,
|
||||
control_plane_api: control_plane_api.unwrap_or_default(),
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
|
||||
branch_name_mappings: Default::default(),
|
||||
};
|
||||
|
||||
@@ -95,21 +95,19 @@ impl PageServerNode {
|
||||
|
||||
let mut overrides = vec![pg_distrib_dir_param, broker_endpoint_param];
|
||||
|
||||
if let Some(control_plane_api) = &self.env.control_plane_api {
|
||||
overrides.push(format!(
|
||||
"control_plane_api='{}'",
|
||||
control_plane_api.as_str()
|
||||
));
|
||||
overrides.push(format!(
|
||||
"control_plane_api='{}'",
|
||||
self.env.control_plane_api.as_str()
|
||||
));
|
||||
|
||||
// Storage controller uses the same auth as pageserver: if JWT is enabled
|
||||
// for us, we will also need it to talk to them.
|
||||
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
|
||||
let jwt_token = self
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
|
||||
.unwrap();
|
||||
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
|
||||
}
|
||||
// Storage controller uses the same auth as pageserver: if JWT is enabled
|
||||
// for us, we will also need it to talk to them.
|
||||
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
|
||||
let jwt_token = self
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
|
||||
.unwrap();
|
||||
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
|
||||
}
|
||||
|
||||
if !conf.other.contains_key("remote_storage") {
|
||||
|
||||
@@ -338,7 +338,7 @@ impl StorageController {
|
||||
.port(),
|
||||
)
|
||||
} else {
|
||||
let listen_url = self.env.control_plane_api.clone().unwrap();
|
||||
let listen_url = self.env.control_plane_api.clone();
|
||||
|
||||
let listen = format!(
|
||||
"{}:{}",
|
||||
@@ -708,7 +708,7 @@ impl StorageController {
|
||||
} else {
|
||||
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
|
||||
// for general purpose API access.
|
||||
let listen_url = self.env.control_plane_api.clone().unwrap();
|
||||
let listen_url = self.env.control_plane_api.clone();
|
||||
Url::from_str(&format!(
|
||||
"http://{}:{}/{path}",
|
||||
listen_url.host_str().unwrap(),
|
||||
|
||||
@@ -5,7 +5,8 @@ use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
|
||||
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
|
||||
TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
||||
@@ -211,6 +212,8 @@ enum Command {
|
||||
#[arg(long)]
|
||||
timeout: humantime::Duration,
|
||||
},
|
||||
/// List safekeepers known to the storage controller
|
||||
Safekeepers {},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -1020,6 +1023,31 @@ async fn main() -> anyhow::Result<()> {
|
||||
"Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
|
||||
);
|
||||
}
|
||||
Command::Safekeepers {} => {
|
||||
let mut resp = storcon_client
|
||||
.dispatch::<(), Vec<SafekeeperDescribeResponse>>(
|
||||
Method::GET,
|
||||
"control/v1/safekeeper".to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
resp.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
|
||||
for sk in resp {
|
||||
table.add_row([
|
||||
format!("{}", sk.id),
|
||||
format!("{}", sk.version),
|
||||
sk.host,
|
||||
format!("{}", sk.port),
|
||||
format!("{}", sk.http_port),
|
||||
sk.availability_zone_id.to_string(),
|
||||
]);
|
||||
}
|
||||
println!("{table}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -67,6 +67,15 @@ pub struct ComputeSpec {
|
||||
#[serde(default)]
|
||||
pub disk_quota_bytes: Option<u64>,
|
||||
|
||||
/// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
|
||||
/// the initial size of LFC.
|
||||
///
|
||||
/// This is intended for use when the LFC size is being overridden from the default but
|
||||
/// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
|
||||
/// LFC sizing.
|
||||
#[serde(default)]
|
||||
pub disable_lfc_resizing: Option<bool>,
|
||||
|
||||
/// Expected cluster state at the end of transition process.
|
||||
pub cluster: Cluster,
|
||||
pub delta_operations: Option<Vec<DeltaOp>>,
|
||||
|
||||
@@ -372,6 +372,23 @@ pub struct MetadataHealthListOutdatedResponse {
|
||||
pub health_records: Vec<MetadataHealthRecord>,
|
||||
}
|
||||
|
||||
/// Publicly exposed safekeeper description
|
||||
///
|
||||
/// The `active` flag which we have in the DB is not included on purpose: it is deprecated.
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct SafekeeperDescribeResponse {
|
||||
pub id: NodeId,
|
||||
pub region_id: String,
|
||||
/// 1 is special, it means just created (not currently posted to storcon).
|
||||
/// Zero or negative is not really expected.
|
||||
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
|
||||
pub version: i64,
|
||||
pub host: String,
|
||||
pub port: i32,
|
||||
pub http_port: i32,
|
||||
pub availability_zone_id: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
@@ -6,6 +6,7 @@ pub mod utilization;
|
||||
use camino::Utf8PathBuf;
|
||||
pub use utilization::PageserverUtilization;
|
||||
|
||||
use core::ops::Range;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Display,
|
||||
@@ -28,6 +29,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
key::Key,
|
||||
reltag::RelTag,
|
||||
shard::{ShardCount, ShardStripeSize, TenantShardId},
|
||||
};
|
||||
@@ -210,6 +212,68 @@ pub enum TimelineState {
|
||||
Broken { reason: String, backtrace: String },
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct CompactLsnRange {
|
||||
pub start: Lsn,
|
||||
pub end: Lsn,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct CompactKeyRange {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub start: Key,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub end: Key,
|
||||
}
|
||||
|
||||
impl From<Range<Lsn>> for CompactLsnRange {
|
||||
fn from(range: Range<Lsn>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Range<Key>> for CompactKeyRange {
|
||||
fn from(range: Range<Key>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactLsnRange> for Range<Lsn> {
|
||||
fn from(range: CompactLsnRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactKeyRange> for Range<Key> {
|
||||
fn from(range: CompactKeyRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactLsnRange {
|
||||
pub fn above(lsn: Lsn) -> Self {
|
||||
Self {
|
||||
start: lsn,
|
||||
end: Lsn::MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CompactInfoResponse {
|
||||
pub compact_key_range: Option<CompactKeyRange>,
|
||||
pub compact_lsn_range: Option<CompactLsnRange>,
|
||||
pub sub_compaction: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateRequest {
|
||||
pub new_timeline_id: TimelineId,
|
||||
|
||||
@@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
|
||||
const TIMELINE_ID: u32 = 1;
|
||||
|
||||
/// Creates a new WAL generator with the given record generator.
|
||||
pub fn new(record_generator: R) -> WalGenerator<R> {
|
||||
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
|
||||
Self {
|
||||
record_generator,
|
||||
lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
lsn: start_lsn,
|
||||
prev_lsn: start_lsn,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "postgres-protocol2"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
license = "MIT/Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -9,8 +9,7 @@
|
||||
//!
|
||||
//! This library assumes that the `client_encoding` backend parameter has been
|
||||
//! set to `UTF8`. It will most likely not behave properly if that is not the case.
|
||||
#![doc(html_root_url = "https://docs.rs/postgres-protocol/0.6")]
|
||||
#![warn(missing_docs, rust_2018_idioms, clippy::all)]
|
||||
#![warn(missing_docs, clippy::all)]
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use std::convert::TryFrom;
|
||||
use std::error::Error;
|
||||
use std::io;
|
||||
use std::marker;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "postgres-types2"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
license = "MIT/Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -2,8 +2,7 @@
|
||||
//!
|
||||
//! This crate is used by the `tokio-postgres` and `postgres` crates. You normally don't need to depend directly on it
|
||||
//! unless you want to define your own `ToSql` or `FromSql` definitions.
|
||||
#![doc(html_root_url = "https://docs.rs/postgres-types/0.2")]
|
||||
#![warn(clippy::all, rust_2018_idioms, missing_docs)]
|
||||
#![warn(clippy::all, missing_docs)]
|
||||
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use postgres_protocol2::types;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "tokio-postgres2"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
license = "MIT/Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -33,10 +33,14 @@ pub struct Response {
|
||||
#[derive(PartialEq, Debug)]
|
||||
enum State {
|
||||
Active,
|
||||
Terminating,
|
||||
Closing,
|
||||
}
|
||||
|
||||
enum WriteReady {
|
||||
Terminating,
|
||||
WaitingOnRead,
|
||||
}
|
||||
|
||||
/// A connection to a PostgreSQL database.
|
||||
///
|
||||
/// This is one half of what is returned when a new connection is established. It performs the actual IO with the
|
||||
@@ -51,7 +55,6 @@ pub struct Connection<S, T> {
|
||||
/// HACK: we need this in the Neon Proxy to forward params.
|
||||
pub parameters: HashMap<String, String>,
|
||||
receiver: mpsc::UnboundedReceiver<Request>,
|
||||
pending_request: Option<RequestMessages>,
|
||||
pending_responses: VecDeque<BackendMessage>,
|
||||
responses: VecDeque<Response>,
|
||||
state: State,
|
||||
@@ -72,7 +75,6 @@ where
|
||||
stream,
|
||||
parameters,
|
||||
receiver,
|
||||
pending_request: None,
|
||||
pending_responses,
|
||||
responses: VecDeque::new(),
|
||||
state: State::Active,
|
||||
@@ -93,26 +95,23 @@ where
|
||||
.map(|o| o.map(|r| r.map_err(Error::io)))
|
||||
}
|
||||
|
||||
fn poll_read(&mut self, cx: &mut Context<'_>) -> Result<Option<AsyncMessage>, Error> {
|
||||
if self.state != State::Active {
|
||||
trace!("poll_read: done");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
/// Read and process messages from the connection to postgres.
|
||||
/// client <- postgres
|
||||
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<Result<AsyncMessage, Error>> {
|
||||
loop {
|
||||
let message = match self.poll_response(cx)? {
|
||||
Poll::Ready(Some(message)) => message,
|
||||
Poll::Ready(None) => return Err(Error::closed()),
|
||||
Poll::Ready(None) => return Poll::Ready(Err(Error::closed())),
|
||||
Poll::Pending => {
|
||||
trace!("poll_read: waiting on response");
|
||||
return Ok(None);
|
||||
return Poll::Pending;
|
||||
}
|
||||
};
|
||||
|
||||
let (mut messages, request_complete) = match message {
|
||||
BackendMessage::Async(Message::NoticeResponse(body)) => {
|
||||
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
|
||||
return Ok(Some(AsyncMessage::Notice(error)));
|
||||
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
|
||||
}
|
||||
BackendMessage::Async(Message::NotificationResponse(body)) => {
|
||||
let notification = Notification {
|
||||
@@ -120,7 +119,7 @@ where
|
||||
channel: body.channel().map_err(Error::parse)?.to_string(),
|
||||
payload: body.message().map_err(Error::parse)?.to_string(),
|
||||
};
|
||||
return Ok(Some(AsyncMessage::Notification(notification)));
|
||||
return Poll::Ready(Ok(AsyncMessage::Notification(notification)));
|
||||
}
|
||||
BackendMessage::Async(Message::ParameterStatus(body)) => {
|
||||
self.parameters.insert(
|
||||
@@ -139,8 +138,10 @@ where
|
||||
let mut response = match self.responses.pop_front() {
|
||||
Some(response) => response,
|
||||
None => match messages.next().map_err(Error::parse)? {
|
||||
Some(Message::ErrorResponse(error)) => return Err(Error::db(error)),
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
Some(Message::ErrorResponse(error)) => {
|
||||
return Poll::Ready(Err(Error::db(error)))
|
||||
}
|
||||
_ => return Poll::Ready(Err(Error::unexpected_message())),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -164,18 +165,14 @@ where
|
||||
request_complete,
|
||||
});
|
||||
trace!("poll_read: waiting on sender");
|
||||
return Ok(None);
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the next client request and enqueue the response sender.
|
||||
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<RequestMessages>> {
|
||||
if let Some(messages) = self.pending_request.take() {
|
||||
trace!("retrying pending request");
|
||||
return Poll::Ready(Some(messages));
|
||||
}
|
||||
|
||||
if self.receiver.is_closed() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
@@ -193,74 +190,80 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_write(&mut self, cx: &mut Context<'_>) -> Result<bool, Error> {
|
||||
/// Process client requests and write them to the postgres connection, flushing if necessary.
|
||||
/// client -> postgres
|
||||
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<WriteReady, Error>> {
|
||||
loop {
|
||||
if self.state == State::Closing {
|
||||
trace!("poll_write: done");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if Pin::new(&mut self.stream)
|
||||
.poll_ready(cx)
|
||||
.map_err(Error::io)?
|
||||
.is_pending()
|
||||
{
|
||||
trace!("poll_write: waiting on socket");
|
||||
return Ok(false);
|
||||
|
||||
// poll_ready is self-flushing.
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
let request = match self.poll_request(cx) {
|
||||
Poll::Ready(Some(request)) => request,
|
||||
Poll::Ready(None) if self.responses.is_empty() && self.state == State::Active => {
|
||||
match self.poll_request(cx) {
|
||||
// send the message to postgres
|
||||
Poll::Ready(Some(RequestMessages::Single(request))) => {
|
||||
Pin::new(&mut self.stream)
|
||||
.start_send(request)
|
||||
.map_err(Error::io)?;
|
||||
}
|
||||
// No more messages from the client, and no more responses to wait for.
|
||||
// Send a terminate message to postgres
|
||||
Poll::Ready(None) if self.responses.is_empty() => {
|
||||
trace!("poll_write: at eof, terminating");
|
||||
self.state = State::Terminating;
|
||||
let mut request = BytesMut::new();
|
||||
frontend::terminate(&mut request);
|
||||
RequestMessages::Single(FrontendMessage::Raw(request.freeze()))
|
||||
let request = FrontendMessage::Raw(request.freeze());
|
||||
|
||||
Pin::new(&mut self.stream)
|
||||
.start_send(request)
|
||||
.map_err(Error::io)?;
|
||||
|
||||
trace!("poll_write: sent eof, closing");
|
||||
trace!("poll_write: done");
|
||||
return Poll::Ready(Ok(WriteReady::Terminating));
|
||||
}
|
||||
// No more messages from the client, but there are still some responses to wait for.
|
||||
Poll::Ready(None) => {
|
||||
trace!(
|
||||
"poll_write: at eof, pending responses {}",
|
||||
self.responses.len()
|
||||
);
|
||||
return Ok(true);
|
||||
ready!(self.poll_flush(cx))?;
|
||||
return Poll::Ready(Ok(WriteReady::WaitingOnRead));
|
||||
}
|
||||
// Still waiting for a message from the client.
|
||||
Poll::Pending => {
|
||||
trace!("poll_write: waiting on request");
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
|
||||
match request {
|
||||
RequestMessages::Single(request) => {
|
||||
Pin::new(&mut self.stream)
|
||||
.start_send(request)
|
||||
.map_err(Error::io)?;
|
||||
if self.state == State::Terminating {
|
||||
trace!("poll_write: sent eof, closing");
|
||||
self.state = State::Closing;
|
||||
}
|
||||
ready!(self.poll_flush(cx))?;
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result<(), Error> {
|
||||
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
||||
match Pin::new(&mut self.stream)
|
||||
.poll_flush(cx)
|
||||
.map_err(Error::io)?
|
||||
{
|
||||
Poll::Ready(()) => trace!("poll_flush: flushed"),
|
||||
Poll::Pending => trace!("poll_flush: waiting on socket"),
|
||||
Poll::Ready(()) => {
|
||||
trace!("poll_flush: flushed");
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Pending => {
|
||||
trace!("poll_flush: waiting on socket");
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
||||
if self.state != State::Closing {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
match Pin::new(&mut self.stream)
|
||||
.poll_close(cx)
|
||||
.map_err(Error::io)?
|
||||
@@ -289,18 +292,30 @@ where
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<AsyncMessage, Error>>> {
|
||||
let message = self.poll_read(cx)?;
|
||||
let want_flush = self.poll_write(cx)?;
|
||||
if want_flush {
|
||||
self.poll_flush(cx)?;
|
||||
if self.state != State::Closing {
|
||||
// if the state is still active, try read from and write to postgres.
|
||||
let message = self.poll_read(cx)?;
|
||||
let closing = self.poll_write(cx)?;
|
||||
if let Poll::Ready(WriteReady::Terminating) = closing {
|
||||
self.state = State::Closing;
|
||||
}
|
||||
|
||||
if let Poll::Ready(message) = message {
|
||||
return Poll::Ready(Some(Ok(message)));
|
||||
}
|
||||
|
||||
// poll_read returned Pending.
|
||||
// poll_write returned Pending or Ready(WriteReady::WaitingOnRead).
|
||||
// if poll_write returned Ready(WriteReady::WaitingOnRead), then we are waiting to read more data from postgres.
|
||||
if self.state != State::Closing {
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
match message {
|
||||
Some(message) => Poll::Ready(Some(Ok(message))),
|
||||
None => match self.poll_shutdown(cx) {
|
||||
Poll::Ready(Ok(())) => Poll::Ready(None),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
|
||||
match self.poll_shutdown(cx) {
|
||||
Poll::Ready(Ok(())) => Poll::Ready(None),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//! An asynchronous, pipelined, PostgreSQL client.
|
||||
#![warn(rust_2018_idioms, clippy::all)]
|
||||
#![warn(clippy::all)]
|
||||
|
||||
pub use crate::cancel_token::CancelToken;
|
||||
pub use crate::client::{Client, SocketConfig};
|
||||
|
||||
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
/// Declare a failpoint that can use the `pause` failpoint action.
|
||||
/// Declare a failpoint that can use to `pause` failpoint action.
|
||||
/// We don't want to block the executor thread, hence, spawn_blocking + await.
|
||||
#[macro_export]
|
||||
macro_rules! pausable_failpoint {
|
||||
@@ -181,7 +181,7 @@ pub async fn failpoints_handler(
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
if !fail::has_failpoints() {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot manage failpoints because storage was compiled without failpoints support"
|
||||
"Cannot manage failpoints because neon was compiled without failpoints support"
|
||||
)));
|
||||
}
|
||||
|
||||
|
||||
@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
TimelineInfo,
|
||||
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
|
||||
TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
@@ -2039,6 +2039,34 @@ async fn timeline_cancel_compact_handler(
|
||||
.await
|
||||
}
|
||||
|
||||
// Get compact info of a timeline
|
||||
async fn timeline_compact_info_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let state = get_state(&request);
|
||||
async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
|
||||
let mut resp = Vec::new();
|
||||
for item in res {
|
||||
resp.push(CompactInfoResponse {
|
||||
compact_key_range: item.compact_key_range,
|
||||
compact_lsn_range: item.compact_lsn_range,
|
||||
sub_compaction: item.sub_compaction,
|
||||
});
|
||||
}
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
// Run compaction immediately on given timeline.
|
||||
async fn timeline_compact_handler(
|
||||
mut request: Request<Body>,
|
||||
@@ -3400,6 +3428,10 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|
||||
|r| api_handler(r, timeline_gc_handler),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|
||||
|r| api_handler(r, timeline_compact_info_handler),
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|
||||
|r| api_handler(r, timeline_compact_handler),
|
||||
|
||||
@@ -3122,6 +3122,23 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduled_compaction_tasks(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Vec<CompactOptions> {
|
||||
use itertools::Itertools;
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard
|
||||
.get(&timeline_id)
|
||||
.map(|tline_pending_tasks| {
|
||||
tline_pending_tasks
|
||||
.iter()
|
||||
.map(|x| x.options.clone())
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Schedule a compaction task for a timeline.
|
||||
pub(crate) async fn schedule_compaction(
|
||||
&self,
|
||||
@@ -5759,13 +5776,13 @@ mod tests {
|
||||
use timeline::{CompactOptions, DeltaLayerTestDesc};
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
use models::CompactLsnRange;
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::CompactLsnRange;
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::GcInfo;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
@@ -9634,7 +9651,7 @@ mod tests {
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
|
||||
use timeline::CompactLsnRange;
|
||||
use models::CompactLsnRange;
|
||||
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use itertools::Itertools;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
|
||||
use super::storage_layer::LayerName;
|
||||
|
||||
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
|
||||
///
|
||||
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
/// The function implements a fast path check and a slow path check.
|
||||
///
|
||||
/// The fast path checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
///
|
||||
/// ```plain
|
||||
/// | | | |
|
||||
@@ -25,31 +28,47 @@ use super::storage_layer::LayerName;
|
||||
/// | | | 4 | | |
|
||||
///
|
||||
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
|
||||
///
|
||||
/// However, if a partial compaction is still going on, it is possible that we get a layer map not satisfying the above condition.
|
||||
/// Therefore, we fallback to simply check if any of the two delta layers overlap. (See "A slow path...")
|
||||
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
|
||||
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
|
||||
let mut all_delta_layers = Vec::new();
|
||||
for name in metadata {
|
||||
if let LayerName::Delta(layer) = name {
|
||||
if layer.key_range.start.next() != layer.key_range.end {
|
||||
all_delta_layers.push(layer.clone());
|
||||
}
|
||||
all_delta_layers.push(layer.clone());
|
||||
}
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
let lsn_range = &layer.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
if layer.key_range.start.next() != layer.key_range.end {
|
||||
let lsn_range = &layer.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
}
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
for (idx, layer) in all_delta_layers.iter().enumerate() {
|
||||
if layer.key_range.start.next() == layer.key_range.end {
|
||||
continue;
|
||||
}
|
||||
let lsn_range = layer.lsn_range.clone();
|
||||
let intersects = lsn_split_point.range(lsn_range).collect_vec();
|
||||
if intersects.len() > 1 {
|
||||
let err = format!(
|
||||
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
|
||||
layer,
|
||||
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
|
||||
);
|
||||
return Some(err);
|
||||
// A slow path to check if the layer intersects with any other delta layer.
|
||||
for (other_idx, other_layer) in all_delta_layers.iter().enumerate() {
|
||||
if other_idx == idx {
|
||||
// do not check self intersects with self
|
||||
continue;
|
||||
}
|
||||
if overlaps_with(&layer.lsn_range, &other_layer.lsn_range)
|
||||
&& overlaps_with(&layer.key_range, &other_layer.key_range)
|
||||
{
|
||||
let err = format!(
|
||||
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
|
||||
layer, other_layer
|
||||
);
|
||||
return Some(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
|
||||
@@ -31,9 +31,9 @@ use pageserver_api::{
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
|
||||
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
|
||||
LsnLease, TimelineState,
|
||||
CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings,
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
|
||||
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
|
||||
},
|
||||
reltag::BlockNumber,
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
@@ -788,63 +788,6 @@ pub(crate) struct CompactRequest {
|
||||
pub sub_compaction_max_job_size_mb: Option<u64>,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct CompactLsnRange {
|
||||
pub start: Lsn,
|
||||
pub end: Lsn,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct CompactKeyRange {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub start: Key,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub end: Key,
|
||||
}
|
||||
|
||||
impl From<Range<Lsn>> for CompactLsnRange {
|
||||
fn from(range: Range<Lsn>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Range<Key>> for CompactKeyRange {
|
||||
fn from(range: Range<Key>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactLsnRange> for Range<Lsn> {
|
||||
fn from(range: CompactLsnRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactKeyRange> for Range<Key> {
|
||||
fn from(range: CompactKeyRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactLsnRange {
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "testing")]
|
||||
pub fn above(lsn: Lsn) -> Self {
|
||||
Self {
|
||||
start: lsn,
|
||||
end: Lsn::MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct CompactOptions {
|
||||
pub flags: EnumSet<CompactFlags>,
|
||||
|
||||
@@ -29,6 +29,7 @@ use utils::id::TimelineId;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::batch_split_writer::{
|
||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||
@@ -1823,7 +1824,7 @@ impl Timeline {
|
||||
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
|
||||
let ((dense_ks, sparse_ks), _) = {
|
||||
let Ok(partition) = self.partitioning.try_lock() else {
|
||||
bail!("failed to acquire partition lock");
|
||||
bail!("failed to acquire partition lock during gc-compaction");
|
||||
};
|
||||
partition.clone()
|
||||
};
|
||||
@@ -2156,15 +2157,14 @@ impl Timeline {
|
||||
|
||||
// Step 1: construct a k-merge iterator over all layers.
|
||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
|
||||
// let layer_names = job_desc
|
||||
// .selected_layers
|
||||
// .iter()
|
||||
// .map(|layer| layer.layer_desc().layer_name())
|
||||
// .collect_vec();
|
||||
// if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
|
||||
// }
|
||||
let layer_names = job_desc
|
||||
.selected_layers
|
||||
.iter()
|
||||
.map(|layer| layer.layer_desc().layer_name())
|
||||
.collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
|
||||
}
|
||||
// The maximum LSN we are processing in this compaction loop
|
||||
let end_lsn = job_desc
|
||||
.selected_layers
|
||||
@@ -2546,13 +2546,48 @@ impl Timeline {
|
||||
);
|
||||
|
||||
// Step 3: Place back to the layer map.
|
||||
|
||||
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
|
||||
let all_layers = {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map()?;
|
||||
layer_map.iter_historic_layers().collect_vec()
|
||||
};
|
||||
|
||||
let mut final_layers = all_layers
|
||||
.iter()
|
||||
.map(|layer| layer.layer_name())
|
||||
.collect::<HashSet<_>>();
|
||||
for layer in &layer_selection {
|
||||
final_layers.remove(&layer.layer_desc().layer_name());
|
||||
}
|
||||
for layer in &compact_to {
|
||||
final_layers.insert(layer.layer_desc().layer_name());
|
||||
}
|
||||
let final_layers = final_layers.into_iter().collect_vec();
|
||||
|
||||
// TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
|
||||
// the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
|
||||
// in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
|
||||
if let Some(err) = check_valid_layermap(&final_layers) {
|
||||
bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
|
||||
}
|
||||
|
||||
// Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
|
||||
// operate on L1 layers.
|
||||
{
|
||||
// TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
|
||||
let mut guard = self.layers.write().await;
|
||||
guard
|
||||
.open_mut()?
|
||||
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
|
||||
};
|
||||
|
||||
// Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
|
||||
// Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
|
||||
// find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
|
||||
// be batched into `schedule_compaction_update`.
|
||||
let disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
self.schedule_uploads(disk_consistent_lsn, None)?;
|
||||
self.remote_client
|
||||
.schedule_compaction_update(&layer_selection, &compact_to)?;
|
||||
|
||||
|
||||
@@ -541,6 +541,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return found;
|
||||
}
|
||||
|
||||
|
||||
@@ -827,7 +827,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
{
|
||||
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
HandleMainLoopInterrupts();
|
||||
shard->n_reconnect_attempts += 1;
|
||||
}
|
||||
shard->n_reconnect_attempts = 0;
|
||||
|
||||
@@ -102,23 +102,39 @@ User can pass several optional headers that will affect resulting json.
|
||||
2. `Neon-Array-Mode: true`. Return postgres rows as arrays instead of objects. That is more compact representation and also helps in some edge
|
||||
cases where it is hard to use rows represented as objects (e.g. when several fields have the same name).
|
||||
|
||||
## Test proxy locally
|
||||
|
||||
## Using SNI-based routing on localhost
|
||||
|
||||
Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy:
|
||||
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.localtest.me` which resolves to `127.0.0.1`.
|
||||
|
||||
Let's create self-signed certificate by running:
|
||||
```sh
|
||||
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.localtest.me"
|
||||
```
|
||||
|
||||
start proxy
|
||||
|
||||
Then we need to build proxy with 'testing' feature and run, e.g.:
|
||||
```sh
|
||||
./target/debug/proxy -c server.crt -k server.key
|
||||
RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://proxy:password@endpoint.localtest.me:5432/postgres' --is-private-access-proxy true -c server.crt -k server.key
|
||||
```
|
||||
|
||||
and connect to it
|
||||
We will also need to have a postgres instance. Assuming that we have setted up docker we can set it up as follows:
|
||||
```sh
|
||||
docker run \
|
||||
--detach \
|
||||
--name proxy-postgres \
|
||||
--env POSTGRES_PASSWORD=proxy-postgres \
|
||||
--publish 5432:5432 \
|
||||
postgres:17-bookworm
|
||||
```
|
||||
|
||||
Next step is setting up auth table and schema as well as creating role (without the JWT table):
|
||||
```sh
|
||||
docker exec -it proxy-postgres psql -U postgres -c "CREATE SCHEMA IF NOT EXISTS neon_control_plane"
|
||||
docker exec -it proxy-postgres psql -U postgres -c "CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
|
||||
docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPERUSER LOGIN PASSWORD 'password';"
|
||||
```
|
||||
|
||||
Now from client you can start a new session:
|
||||
|
||||
```sh
|
||||
PGSSLROOTCERT=./server.crt psql 'postgres://my-cluster-42.localtest.me:1234?sslmode=verify-full'
|
||||
```
|
||||
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.localtest.me:4432/postgres?sslmode=verify-full"
|
||||
```
|
||||
@@ -678,6 +678,9 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// flush the final server message
|
||||
stream.flush().await.unwrap();
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use tracing::info;
|
||||
|
||||
use super::backend::ComputeCredentialKeys;
|
||||
use super::{AuthError, PasswordHackPayload};
|
||||
use crate::config::TlsServerEndPoint;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::AuthSecret;
|
||||
use crate::intern::EndpointIdInt;
|
||||
@@ -18,6 +17,7 @@ use crate::sasl;
|
||||
use crate::scram::threadpool::ThreadPool;
|
||||
use crate::scram::{self};
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::tls::TlsServerEndPoint;
|
||||
|
||||
/// Every authentication selector is supposed to implement this trait.
|
||||
pub(crate) trait AuthMethod {
|
||||
|
||||
@@ -13,7 +13,10 @@ use proxy::auth::backend::jwt::JwkCache;
|
||||
use proxy::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
|
||||
use proxy::auth::{self};
|
||||
use proxy::cancellation::CancellationHandlerMain;
|
||||
use proxy::config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig};
|
||||
use proxy::config::{
|
||||
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
|
||||
};
|
||||
use proxy::conn::TokioTcpAcceptor;
|
||||
use proxy::control_plane::locks::ApiLocks;
|
||||
use proxy::control_plane::messages::{EndpointJwksResponse, JwksSettings};
|
||||
use proxy::http::health_server::AppMetrics;
|
||||
@@ -25,6 +28,7 @@ use proxy::rate_limiter::{
|
||||
use proxy::scram::threadpool::ThreadPool;
|
||||
use proxy::serverless::cancel_set::CancelSet;
|
||||
use proxy::serverless::{self, GlobalConnPoolOptions};
|
||||
use proxy::tls::client_config::compute_client_config_with_root_certs;
|
||||
use proxy::types::RoleName;
|
||||
use proxy::url::ApiUrl;
|
||||
|
||||
@@ -33,7 +37,6 @@ project_build_tag!(BUILD_TAG);
|
||||
|
||||
use clap::Parser;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -163,8 +166,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
let metrics_listener = TcpListener::bind(args.metrics).await?.into_std()?;
|
||||
let http_listener = TcpListener::bind(args.http).await?;
|
||||
let metrics_listener = TokioTcpAcceptor::bind(args.metrics).await?;
|
||||
let http_listener = TokioTcpAcceptor::bind(args.http).await?;
|
||||
let shutdown = CancellationToken::new();
|
||||
|
||||
// todo: should scale with CU
|
||||
@@ -209,6 +212,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
http_listener,
|
||||
shutdown.clone(),
|
||||
Arc::new(CancellationHandlerMain::new(
|
||||
&config.connect_to_compute,
|
||||
Arc::new(DashMap::new()),
|
||||
None,
|
||||
proxy::metrics::CancellationSource::Local,
|
||||
@@ -268,10 +272,15 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes,
|
||||
};
|
||||
|
||||
let compute_config = ComputeConfig {
|
||||
retry: RetryConfig::parse(RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)?,
|
||||
tls: Arc::new(compute_client_config_with_root_certs()?),
|
||||
timeout: Duration::from_secs(2),
|
||||
};
|
||||
|
||||
Ok(Box::leak(Box::new(ProxyConfig {
|
||||
tls_config: None,
|
||||
metric_collection: None,
|
||||
allow_self_signed_compute: false,
|
||||
http_config,
|
||||
authentication_config: AuthenticationConfig {
|
||||
jwks_cache: JwkCache::default(),
|
||||
@@ -290,9 +299,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
region: "local".into(),
|
||||
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
|
||||
connect_compute_locks,
|
||||
connect_to_compute_retry_config: RetryConfig::parse(
|
||||
RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES,
|
||||
)?,
|
||||
connect_to_compute: compute_config,
|
||||
})))
|
||||
}
|
||||
|
||||
|
||||
@@ -10,12 +10,13 @@ use clap::Arg;
|
||||
use futures::future::Either;
|
||||
use futures::TryFutureExt;
|
||||
use itertools::Itertools;
|
||||
use proxy::config::TlsServerEndPoint;
|
||||
use proxy::conn::{Acceptor, TokioTcpAcceptor};
|
||||
use proxy::context::RequestContext;
|
||||
use proxy::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use proxy::protocol2::ConnectionInfo;
|
||||
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
|
||||
use proxy::stream::{PqStream, Stream};
|
||||
use proxy::tls::TlsServerEndPoint;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::pki_types::PrivateKeyDer;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -122,7 +123,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Start listening for incoming client connections
|
||||
let proxy_address: SocketAddr = args.get_one::<String>("listen").unwrap().parse()?;
|
||||
info!("Starting sni router on {proxy_address}");
|
||||
let proxy_listener = TcpListener::bind(proxy_address).await?;
|
||||
let proxy_listener = TokioTcpAcceptor::bind(proxy_address).await?;
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
@@ -152,17 +153,13 @@ async fn task_main(
|
||||
dest_suffix: Arc<String>,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
tls_server_end_point: TlsServerEndPoint,
|
||||
listener: tokio::net::TcpListener,
|
||||
acceptor: TokioTcpAcceptor,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
// When set for the server socket, the keepalive setting
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
|
||||
while let Some(accept_result) =
|
||||
run_until_cancelled(listener.accept(), &cancellation_token).await
|
||||
run_until_cancelled(acceptor.accept(), &cancellation_token).await
|
||||
{
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
@@ -172,10 +169,6 @@ async fn task_main(
|
||||
|
||||
connections.spawn(
|
||||
async move {
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
.context("failed to set socket option")?;
|
||||
|
||||
info!(%peer_addr, "serving");
|
||||
let ctx = RequestContext::new(
|
||||
session_id,
|
||||
@@ -197,7 +190,7 @@ async fn task_main(
|
||||
}
|
||||
|
||||
connections.close();
|
||||
drop(listener);
|
||||
drop(acceptor);
|
||||
|
||||
connections.wait().await;
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use futures::future::Either;
|
||||
@@ -8,9 +9,10 @@ use proxy::auth::backend::jwt::JwkCache;
|
||||
use proxy::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
|
||||
use proxy::cancellation::{CancelMap, CancellationHandler};
|
||||
use proxy::config::{
|
||||
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, HttpConfig,
|
||||
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig,
|
||||
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2,
|
||||
};
|
||||
use proxy::conn::TokioTcpAcceptor;
|
||||
use proxy::context::parquet::ParquetUploadArgs;
|
||||
use proxy::http::health_server::AppMetrics;
|
||||
use proxy::metrics::Metrics;
|
||||
@@ -23,9 +25,9 @@ use proxy::redis::{elasticache, notifications};
|
||||
use proxy::scram::threadpool::ThreadPool;
|
||||
use proxy::serverless::cancel_set::CancelSet;
|
||||
use proxy::serverless::GlobalConnPoolOptions;
|
||||
use proxy::tls::client_config::compute_client_config_with_root_certs;
|
||||
use proxy::{auth, control_plane, http, serverless, usage_metrics};
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -129,9 +131,6 @@ struct ProxyCliArgs {
|
||||
/// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)]
|
||||
connect_compute_lock: String,
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
#[clap(flatten)]
|
||||
sql_over_http: SqlOverHttpArgs,
|
||||
/// timeout for scram authentication protocol
|
||||
@@ -354,17 +353,17 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Check that we can bind to address before further initialization
|
||||
let http_address: SocketAddr = args.http.parse()?;
|
||||
info!("Starting http on {http_address}");
|
||||
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
|
||||
let http_listener = TokioTcpAcceptor::bind(http_address).await?;
|
||||
|
||||
let mgmt_address: SocketAddr = args.mgmt.parse()?;
|
||||
info!("Starting mgmt on {mgmt_address}");
|
||||
let mgmt_listener = TcpListener::bind(mgmt_address).await?;
|
||||
let mgmt_listener = TokioTcpAcceptor::bind(mgmt_address).await?;
|
||||
|
||||
let proxy_listener = if !args.is_auth_broker {
|
||||
let proxy_address: SocketAddr = args.proxy.parse()?;
|
||||
info!("Starting proxy on {proxy_address}");
|
||||
|
||||
Some(TcpListener::bind(proxy_address).await?)
|
||||
Some(TokioTcpAcceptor::bind(proxy_address).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -374,7 +373,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let serverless_listener = if let Some(serverless_address) = args.wss {
|
||||
let serverless_address: SocketAddr = serverless_address.parse()?;
|
||||
info!("Starting wss on {serverless_address}");
|
||||
Some(TcpListener::bind(serverless_address).await?)
|
||||
Some(TokioTcpAcceptor::bind(serverless_address).await?)
|
||||
} else if args.is_auth_broker {
|
||||
bail!("wss arg must be present for auth-broker")
|
||||
} else {
|
||||
@@ -400,6 +399,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let cancellation_handler = Arc::new(CancellationHandler::<
|
||||
Option<Arc<Mutex<RedisPublisherClient>>>,
|
||||
>::new(
|
||||
&config.connect_to_compute,
|
||||
cancel_map.clone(),
|
||||
redis_publisher,
|
||||
proxy::metrics::CancellationSource::FromClient,
|
||||
@@ -495,6 +495,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let cache = api.caches.project_info.clone();
|
||||
if let Some(client) = client1 {
|
||||
maintenance_tasks.spawn(notifications::task_main(
|
||||
config,
|
||||
client,
|
||||
cache.clone(),
|
||||
cancel_map.clone(),
|
||||
@@ -503,6 +504,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
if let Some(client) = client2 {
|
||||
maintenance_tasks.spawn(notifications::task_main(
|
||||
config,
|
||||
client,
|
||||
cache.clone(),
|
||||
cancel_map.clone(),
|
||||
@@ -564,9 +566,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
|
||||
};
|
||||
|
||||
if args.allow_self_signed_compute {
|
||||
warn!("allowing self-signed compute certificates");
|
||||
}
|
||||
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
|
||||
interval: args.metric_backup_collection_interval,
|
||||
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),
|
||||
@@ -638,10 +637,15 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
console_redirect_confirmation_timeout: args.webauth_confirmation_timeout,
|
||||
};
|
||||
|
||||
let compute_config = ComputeConfig {
|
||||
retry: config::RetryConfig::parse(&args.connect_to_compute_retry)?,
|
||||
tls: Arc::new(compute_client_config_with_root_certs()?),
|
||||
timeout: Duration::from_secs(2),
|
||||
};
|
||||
|
||||
let config = ProxyConfig {
|
||||
tls_config,
|
||||
metric_collection,
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
authentication_config,
|
||||
proxy_protocol_v2: args.proxy_protocol_v2,
|
||||
@@ -649,9 +653,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
region: args.region.clone(),
|
||||
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
|
||||
connect_compute_locks,
|
||||
connect_to_compute_retry_config: config::RetryConfig::parse(
|
||||
&args.connect_to_compute_retry,
|
||||
)?,
|
||||
connect_to_compute: compute_config,
|
||||
};
|
||||
|
||||
let config = Box::leak(Box::new(config));
|
||||
|
||||
@@ -3,10 +3,9 @@ use std::sync::Arc;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use once_cell::sync::OnceCell;
|
||||
use postgres_client::{tls::MakeTlsConnect, CancelToken};
|
||||
use postgres_client::tls::MakeTlsConnect;
|
||||
use postgres_client::CancelToken;
|
||||
use pq_proto::CancelKeyData;
|
||||
use rustls::crypto::ring;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -14,6 +13,7 @@ use tracing::{debug, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::error::ReportableError;
|
||||
use crate::ext::LockExt;
|
||||
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
|
||||
@@ -21,9 +21,7 @@ use crate::rate_limiter::LeakyBucketRateLimiter;
|
||||
use crate::redis::cancellation_publisher::{
|
||||
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
|
||||
};
|
||||
|
||||
use crate::compute::{load_certs, AcceptEverythingVerifier};
|
||||
use crate::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
|
||||
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
|
||||
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
|
||||
@@ -35,6 +33,7 @@ type IpSubnetKey = IpNet;
|
||||
///
|
||||
/// If `CancellationPublisher` is available, cancel request will be used to publish the cancellation key to other proxy instances.
|
||||
pub struct CancellationHandler<P> {
|
||||
compute_config: &'static ComputeConfig,
|
||||
map: CancelMap,
|
||||
client: P,
|
||||
/// This field used for the monitoring purposes.
|
||||
@@ -183,7 +182,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
"cancelling query per user's request using key {key}, hostname {}, address: {}",
|
||||
cancel_closure.hostname, cancel_closure.socket_addr
|
||||
);
|
||||
cancel_closure.try_cancel_query().await
|
||||
cancel_closure.try_cancel_query(self.compute_config).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -198,8 +197,13 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
}
|
||||
|
||||
impl CancellationHandler<()> {
|
||||
pub fn new(map: CancelMap, from: CancellationSource) -> Self {
|
||||
pub fn new(
|
||||
compute_config: &'static ComputeConfig,
|
||||
map: CancelMap,
|
||||
from: CancellationSource,
|
||||
) -> Self {
|
||||
Self {
|
||||
compute_config,
|
||||
map,
|
||||
client: (),
|
||||
from,
|
||||
@@ -214,8 +218,14 @@ impl CancellationHandler<()> {
|
||||
}
|
||||
|
||||
impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
|
||||
pub fn new(map: CancelMap, client: Option<Arc<Mutex<P>>>, from: CancellationSource) -> Self {
|
||||
pub fn new(
|
||||
compute_config: &'static ComputeConfig,
|
||||
map: CancelMap,
|
||||
client: Option<Arc<Mutex<P>>>,
|
||||
from: CancellationSource,
|
||||
) -> Self {
|
||||
Self {
|
||||
compute_config,
|
||||
map,
|
||||
client,
|
||||
from,
|
||||
@@ -229,8 +239,6 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
|
||||
}
|
||||
}
|
||||
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
/// This should've been a [`std::future::Future`], but
|
||||
/// it's impossible to name a type of an unboxed future
|
||||
/// (we'd need something like `#![feature(type_alias_impl_trait)]`).
|
||||
@@ -240,7 +248,6 @@ pub struct CancelClosure {
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String, // for pg_sni router
|
||||
allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
impl CancelClosure {
|
||||
@@ -249,47 +256,23 @@ impl CancelClosure {
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String,
|
||||
allow_self_signed_compute: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
socket_addr,
|
||||
cancel_token,
|
||||
ip_allowlist,
|
||||
hostname,
|
||||
allow_self_signed_compute,
|
||||
}
|
||||
}
|
||||
/// Cancels the query running on user's compute node.
|
||||
pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> {
|
||||
pub(crate) async fn try_cancel_query(
|
||||
self,
|
||||
compute_config: &ComputeConfig,
|
||||
) -> Result<(), CancelError> {
|
||||
let socket = TcpStream::connect(self.socket_addr).await?;
|
||||
|
||||
let client_config = if self.allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection. Used only for tests
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = TLS_ROOTS
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(|_e| {
|
||||
CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"TLS root store initialization failed".to_string(),
|
||||
))
|
||||
})?
|
||||
.clone();
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.with_root_certificates(root_store)
|
||||
};
|
||||
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let mut mk_tls =
|
||||
crate::tls::postgres_rustls::MakeRustlsConnect::new(compute_config.tls.clone());
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
&mut mk_tls,
|
||||
&self.hostname,
|
||||
@@ -341,11 +324,30 @@ impl<P> Drop for Session<P> {
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
use crate::config::RetryConfig;
|
||||
use crate::tls::client_config::compute_client_config_with_certs;
|
||||
|
||||
fn config() -> ComputeConfig {
|
||||
let retry = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
|
||||
ComputeConfig {
|
||||
retry,
|
||||
tls: Arc::new(compute_client_config_with_certs(std::iter::empty())),
|
||||
timeout: Duration::from_secs(2),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn check_session_drop() -> anyhow::Result<()> {
|
||||
let cancellation_handler = Arc::new(CancellationHandler::<()>::new(
|
||||
Box::leak(Box::new(config())),
|
||||
CancelMap::default(),
|
||||
CancellationSource::FromRedis,
|
||||
));
|
||||
@@ -361,8 +363,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn cancel_session_noop_regression() {
|
||||
let handler =
|
||||
CancellationHandler::<()>::new(CancelMap::default(), CancellationSource::Local);
|
||||
let handler = CancellationHandler::<()>::new(
|
||||
Box::leak(Box::new(config())),
|
||||
CancelMap::default(),
|
||||
CancellationSource::Local,
|
||||
);
|
||||
handler
|
||||
.cancel_session(
|
||||
CancelKeyData {
|
||||
|
||||
@@ -1,17 +1,13 @@
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use postgres_client::tls::MakeTlsConnect;
|
||||
use postgres_client::{CancelToken, RawConnection};
|
||||
use postgres_protocol::message::backend::NoticeResponseBody;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use rustls::client::danger::ServerCertVerifier;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::pki_types::InvalidDnsNameError;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -19,14 +15,15 @@ use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::auth::parse_endpoint_param;
|
||||
use crate::cancellation::CancelClosure;
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, NumDbConnectionsGuard};
|
||||
use crate::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::proxy::neon_option;
|
||||
use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::types::Host;
|
||||
|
||||
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
@@ -41,9 +38,6 @@ pub(crate) enum ConnectionError {
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
|
||||
#[error("Couldn't load native TLS certificates: {0:?}")]
|
||||
TlsCertificateError(Vec<rustls_native_certs::Error>),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
TlsError(#[from] InvalidDnsNameError),
|
||||
|
||||
@@ -90,7 +84,6 @@ impl ReportableError for ConnectionError {
|
||||
}
|
||||
ConnectionError::Postgres(_) => crate::error::ErrorKind::Compute,
|
||||
ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute,
|
||||
ConnectionError::TlsCertificateError(_) => crate::error::ErrorKind::Service,
|
||||
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
|
||||
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
|
||||
ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(),
|
||||
@@ -200,11 +193,15 @@ impl ConnCfg {
|
||||
|
||||
let connect_once = |host, port| {
|
||||
debug!("trying to connect to compute node at {host}:{port}");
|
||||
connect_with_timeout(host, port).and_then(|socket| async {
|
||||
let socket_addr = socket.peer_addr()?;
|
||||
connect_with_timeout(host, port).and_then(|stream| async {
|
||||
let socket_addr = stream.peer_addr()?;
|
||||
let socket = socket2::SockRef::from(&stream);
|
||||
// Disable Nagle's algorithm to not introduce latency between
|
||||
// client and compute.
|
||||
socket.set_nodelay(true)?;
|
||||
// This prevents load balancer from severing the connection.
|
||||
socket2::SockRef::from(&socket).set_keepalive(true)?;
|
||||
Ok((socket_addr, socket))
|
||||
socket.set_keepalive(true)?;
|
||||
Ok((socket_addr, stream))
|
||||
})
|
||||
};
|
||||
|
||||
@@ -251,35 +248,14 @@ impl ConnCfg {
|
||||
pub(crate) async fn connect(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
allow_self_signed_compute: bool,
|
||||
aux: MetricsAuxInfo,
|
||||
timeout: Duration,
|
||||
config: &ComputeConfig,
|
||||
) -> Result<PostgresConnection, ConnectionError> {
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
|
||||
let (socket_addr, stream, host) = self.connect_raw(config.timeout).await?;
|
||||
drop(pause);
|
||||
|
||||
let client_config = if allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = TLS_ROOTS
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(ConnectionError::TlsCertificateError)?
|
||||
.clone();
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.with_root_certificates(root_store)
|
||||
};
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let mut mk_tls = crate::tls::postgres_rustls::MakeRustlsConnect::new(config.tls.clone());
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
&mut mk_tls,
|
||||
host,
|
||||
@@ -320,7 +296,6 @@ impl ConnCfg {
|
||||
},
|
||||
vec![],
|
||||
host.to_string(),
|
||||
allow_self_signed_compute,
|
||||
);
|
||||
|
||||
let connection = PostgresConnection {
|
||||
@@ -352,63 +327,6 @@ fn filtered_options(options: &str) -> Option<String> {
|
||||
Some(options)
|
||||
}
|
||||
|
||||
pub(crate) fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::Error>> {
|
||||
let der_certs = rustls_native_certs::load_native_certs();
|
||||
|
||||
if !der_certs.errors.is_empty() {
|
||||
return Err(der_certs.errors);
|
||||
}
|
||||
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add_parsable_certificates(der_certs.certs);
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct AcceptEverythingVerifier;
|
||||
impl ServerCertVerifier for AcceptEverythingVerifier {
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
use rustls::SignatureScheme;
|
||||
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
|
||||
vec![
|
||||
SignatureScheme::ECDSA_NISTP521_SHA512,
|
||||
SignatureScheme::ECDSA_NISTP384_SHA384,
|
||||
SignatureScheme::ECDSA_NISTP256_SHA256,
|
||||
SignatureScheme::RSA_PSS_SHA512,
|
||||
SignatureScheme::RSA_PSS_SHA384,
|
||||
SignatureScheme::RSA_PSS_SHA256,
|
||||
SignatureScheme::ED25519,
|
||||
]
|
||||
}
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
_server_name: &rustls::pki_types::ServerName<'_>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: rustls::pki_types::UnixTime,
|
||||
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,17 +1,10 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use clap::ValueEnum;
|
||||
use itertools::Itertools;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use rustls::crypto::ring::{self, sign};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use sha2::{Digest, Sha256};
|
||||
use tracing::{error, info};
|
||||
use x509_parser::oid_registry;
|
||||
|
||||
use crate::auth::backend::jwt::JwkCache;
|
||||
use crate::auth::backend::AuthRateLimiter;
|
||||
@@ -20,12 +13,12 @@ use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}
|
||||
use crate::scram::threadpool::ThreadPool;
|
||||
use crate::serverless::cancel_set::CancelSet;
|
||||
use crate::serverless::GlobalConnPoolOptions;
|
||||
pub use crate::tls::server_config::{configure_tls, TlsConfig};
|
||||
use crate::types::Host;
|
||||
|
||||
pub struct ProxyConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub allow_self_signed_compute: bool,
|
||||
pub http_config: HttpConfig,
|
||||
pub authentication_config: AuthenticationConfig,
|
||||
pub proxy_protocol_v2: ProxyProtocolV2,
|
||||
@@ -33,7 +26,13 @@ pub struct ProxyConfig {
|
||||
pub handshake_timeout: Duration,
|
||||
pub wake_compute_retry_config: RetryConfig,
|
||||
pub connect_compute_locks: ApiLocks<Host>,
|
||||
pub connect_to_compute_retry_config: RetryConfig,
|
||||
pub connect_to_compute: ComputeConfig,
|
||||
}
|
||||
|
||||
pub struct ComputeConfig {
|
||||
pub retry: RetryConfig,
|
||||
pub tls: Arc<rustls::ClientConfig>,
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
|
||||
@@ -53,12 +52,6 @@ pub struct MetricCollectionConfig {
|
||||
pub backup_metric_collection_config: MetricBackupCollectionConfig,
|
||||
}
|
||||
|
||||
pub struct TlsConfig {
|
||||
pub config: Arc<rustls::ServerConfig>,
|
||||
pub common_names: HashSet<String>,
|
||||
pub cert_resolver: Arc<CertResolver>,
|
||||
}
|
||||
|
||||
pub struct HttpConfig {
|
||||
pub accept_websockets: bool,
|
||||
pub pool_options: GlobalConnPoolOptions,
|
||||
@@ -81,272 +74,6 @@ pub struct AuthenticationConfig {
|
||||
pub console_redirect_confirmation_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
impl TlsConfig {
|
||||
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
|
||||
self.config.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L159>
|
||||
pub const PG_ALPN_PROTOCOL: &[u8] = b"postgresql";
|
||||
|
||||
/// Configure TLS for the main endpoint.
|
||||
pub fn configure_tls(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
certs_dir: Option<&String>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
cert_resolver.add_cert_path(key_path, cert_path, true)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
for entry in std::fs::read_dir(certs_dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
// file names aligned with default cert-manager names
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver.add_cert_path(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let common_names = cert_resolver.get_common_names();
|
||||
|
||||
let cert_resolver = Arc::new(cert_resolver);
|
||||
|
||||
// allow TLS 1.2 to be compatible with older client libraries
|
||||
let mut config =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
|
||||
.context("ring should support TLS1.2 and TLS1.3")?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver.clone());
|
||||
|
||||
config.alpn_protocols = vec![PG_ALPN_PROTOCOL.to_vec()];
|
||||
|
||||
if allow_tls_keylogfile {
|
||||
// KeyLogFile will check for the SSLKEYLOGFILE environment variable.
|
||||
config.key_log = Arc::new(rustls::KeyLogFile::new());
|
||||
}
|
||||
|
||||
Ok(TlsConfig {
|
||||
config: Arc::new(config),
|
||||
common_names,
|
||||
cert_resolver,
|
||||
})
|
||||
}
|
||||
|
||||
/// Channel binding parameter
|
||||
///
|
||||
/// <https://www.rfc-editor.org/rfc/rfc5929#section-4>
|
||||
/// Description: The hash of the TLS server's certificate as it
|
||||
/// appears, octet for octet, in the server's Certificate message. Note
|
||||
/// that the Certificate message contains a certificate_list, in which
|
||||
/// the first element is the server's certificate.
|
||||
///
|
||||
/// The hash function is to be selected as follows:
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function, and that hash function is either MD5 or SHA-1, then use SHA-256;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function and that hash function neither MD5 nor SHA-1, then use
|
||||
/// the hash function associated with the certificate's
|
||||
/// signatureAlgorithm;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses no hash functions or
|
||||
/// uses multiple hash functions, then this channel binding type's
|
||||
/// channel bindings are undefined at this time (updates to is channel
|
||||
/// binding type may occur to address this issue if it ever arises).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum TlsServerEndPoint {
|
||||
Sha256([u8; 32]),
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl TlsServerEndPoint {
|
||||
pub fn new(cert: &CertificateDer<'_>) -> anyhow::Result<Self> {
|
||||
let sha256_oids = [
|
||||
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
|
||||
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
|
||||
oid_registry::OID_PKCS1_SHA256WITHRSA,
|
||||
];
|
||||
|
||||
let pem = x509_parser::parse_x509_certificate(cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
info!(subject = %pem.subject, "parsing TLS certificate");
|
||||
|
||||
let reg = oid_registry::OidRegistry::default().with_all_crypto();
|
||||
let oid = pem.signature_algorithm.oid();
|
||||
let alg = reg.get(oid);
|
||||
if sha256_oids.contains(oid) {
|
||||
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
|
||||
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
|
||||
Ok(Self::Sha256(tls_server_end_point))
|
||||
} else {
|
||||
error!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), "unknown channel binding");
|
||||
Ok(Self::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn supported(&self) -> bool {
|
||||
!matches!(self, TlsServerEndPoint::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn add_cert_path(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
|
||||
})?
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
}
|
||||
|
||||
pub fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
let pem = x509_parser::parse_x509_certificate(first_cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
let common_name = pem.subject().to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some((cert.clone(), tls_server_end_point));
|
||||
}
|
||||
|
||||
self.certs.insert(common_name, (cert, tls_server_end_point));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_common_names(&self) -> HashSet<String> {
|
||||
self.certs.keys().map(|s| s.to_string()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
fn resolve(
|
||||
&self,
|
||||
client_hello: rustls::server::ClientHello<'_>,
|
||||
) -> Option<Arc<rustls::sign::CertifiedKey>> {
|
||||
self.resolve(client_hello.server_name()).map(|x| x.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn resolve(
|
||||
&self,
|
||||
server_name: Option<&str>,
|
||||
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
|
||||
// loop here and cut off more and more subdomains until we find
|
||||
// a match to get a proper wildcard support. OTOH, we now do not
|
||||
// use nested domains, so keep this simple for now.
|
||||
//
|
||||
// With the current coding foo.com will match *.foo.com and that
|
||||
// repeats behavior of the old code.
|
||||
if let Some(mut sni_name) = server_name {
|
||||
loop {
|
||||
if let Some(cert) = self.certs.get(sni_name) {
|
||||
return Some(cert.clone());
|
||||
}
|
||||
if let Some((_, rest)) = sni_name.split_once('.') {
|
||||
sni_name = rest;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No SNI, use the default certificate, otherwise we can't get to
|
||||
// options parameter which can be used to set endpoint name too.
|
||||
// That means that non-SNI flow will not work for CNAME domains in
|
||||
// verify-full mode.
|
||||
//
|
||||
// If that will be a problem we can:
|
||||
//
|
||||
// a) Instead of multi-cert approach use single cert with extra
|
||||
// domains listed in Subject Alternative Name (SAN).
|
||||
// b) Deploy separate proxy instances for extra domains.
|
||||
self.default.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EndpointCacheConfig {
|
||||
/// Batch size to receive all endpoints on the startup.
|
||||
|
||||
221
proxy/src/conn.rs
Normal file
221
proxy/src/conn.rs
Normal file
@@ -0,0 +1,221 @@
|
||||
use std::future::{poll_fn, Future};
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
|
||||
|
||||
pub trait Acceptor {
|
||||
type Connection: AsyncRead + AsyncWrite + Send + Unpin + 'static;
|
||||
type Error: std::error::Error + Send + Sync + 'static;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let _ = cx;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn accept(
|
||||
&self,
|
||||
) -> impl Future<Output = Result<(Self::Connection, SocketAddr), Self::Error>> + Send;
|
||||
}
|
||||
|
||||
pub trait Connector {
|
||||
type Connection: AsyncRead + AsyncWrite + Send + Unpin + 'static;
|
||||
type Error: std::error::Error + Send + Sync + 'static;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let _ = cx;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn connect(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
|
||||
}
|
||||
|
||||
pub struct TokioTcpAcceptor {
|
||||
listener: TcpListener,
|
||||
tcp_nodelay: Option<bool>,
|
||||
tcp_keepalive: Option<bool>,
|
||||
}
|
||||
|
||||
impl TokioTcpAcceptor {
|
||||
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
// When set for the server socket, the keepalive setting
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
Ok(Self {
|
||||
listener,
|
||||
tcp_nodelay: Some(true),
|
||||
tcp_keepalive: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn into_std(self) -> io::Result<std::net::TcpListener> {
|
||||
self.listener.into_std()
|
||||
}
|
||||
}
|
||||
|
||||
impl Acceptor for TokioTcpAcceptor {
|
||||
type Connection = TcpStream;
|
||||
type Error = io::Error;
|
||||
|
||||
fn accept(&self) -> impl Future<Output = Result<(Self::Connection, SocketAddr), Self::Error>> {
|
||||
async move {
|
||||
let (stream, addr) = self.listener.accept().await?;
|
||||
|
||||
let socket = socket2::SockRef::from(&stream);
|
||||
if let Some(nodelay) = self.tcp_nodelay {
|
||||
socket.set_nodelay(nodelay)?;
|
||||
}
|
||||
if let Some(keepalive) = self.tcp_keepalive {
|
||||
socket.set_keepalive(keepalive)?;
|
||||
}
|
||||
|
||||
Ok((stream, addr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TokioTcpConnector;
|
||||
|
||||
impl Connector for TokioTcpConnector {
|
||||
type Connection = TcpStream;
|
||||
type Error = io::Error;
|
||||
|
||||
fn connect(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
) -> impl Future<Output = Result<Self::Connection, Self::Error>> {
|
||||
async move {
|
||||
let socket = TcpStream::connect(addr).await?;
|
||||
socket.set_nodelay(true)?;
|
||||
Ok(socket)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Stream: AsyncRead + AsyncWrite + Send + Unpin + 'static {}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> Stream for T {}
|
||||
|
||||
pub trait AsyncRead {
|
||||
fn readable(&self) -> impl Future<Output = io::Result<()>> + Send
|
||||
where
|
||||
Self: Send + Sync,
|
||||
{
|
||||
poll_fn(move |cx| self.poll_read_ready(cx))
|
||||
}
|
||||
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
|
||||
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>>;
|
||||
|
||||
fn poll_read_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &mut [io::IoSliceMut<'_>],
|
||||
) -> Poll<io::Result<usize>>;
|
||||
}
|
||||
|
||||
pub trait AsyncWrite {
|
||||
fn writable(&self) -> impl Future<Output = io::Result<()>> + Send
|
||||
where
|
||||
Self: Send + Sync,
|
||||
{
|
||||
poll_fn(move |cx| self.poll_write_ready(cx))
|
||||
}
|
||||
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
|
||||
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>>;
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[io::IoSlice<'_>],
|
||||
) -> Poll<io::Result<usize>>;
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
|
||||
}
|
||||
|
||||
impl AsyncRead for tokio::net::TcpStream {
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
tokio::net::TcpStream::poll_read_ready(self, cx)
|
||||
}
|
||||
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
match tokio::net::TcpStream::try_read(Pin::new(&mut *self).get_mut(), buf) {
|
||||
Ok(n) => Poll::Ready(Ok(n)),
|
||||
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_read_vectored(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &mut [io::IoSliceMut<'_>],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
match tokio::net::TcpStream::try_read_vectored(Pin::new(&mut *self).get_mut(), bufs) {
|
||||
Ok(n) => Poll::Ready(Ok(n)),
|
||||
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for tokio::net::TcpStream {
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
tokio::net::TcpStream::poll_write_ready(self, cx)
|
||||
}
|
||||
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
<Self as tokio::io::AsyncWrite>::poll_write(self, cx, buf)
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[io::IoSlice<'_>],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
<Self as tokio::io::AsyncWrite>::poll_write_vectored(self, cx, bufs)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
<Self as tokio::io::AsyncWrite>::poll_flush(self, cx)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
<Self as tokio::io::AsyncWrite>::poll_shutdown(self, cx)
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ use tracing::{debug, error, info, Instrument};
|
||||
use crate::auth::backend::ConsoleRedirectBackend;
|
||||
use crate::cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal};
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::conn::{Acceptor, TokioTcpAcceptor};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
@@ -22,7 +23,7 @@ use crate::proxy::{
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
backend: &'static ConsoleRedirectBackend,
|
||||
listener: tokio::net::TcpListener,
|
||||
acceptor: TokioTcpAcceptor,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -30,15 +31,11 @@ pub async fn task_main(
|
||||
info!("proxy has shut down");
|
||||
}
|
||||
|
||||
// When set for the server socket, the keepalive setting
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
|
||||
while let Some(accept_result) =
|
||||
run_until_cancelled(listener.accept(), &cancellation_token).await
|
||||
run_until_cancelled(acceptor.accept(), &cancellation_token).await
|
||||
{
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
@@ -115,7 +112,7 @@ pub async fn task_main(
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass().await {
|
||||
match p.proxy_pass(&config.connect_to_compute).await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
error!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
|
||||
@@ -131,7 +128,7 @@ pub async fn task_main(
|
||||
|
||||
connections.close();
|
||||
cancellations.close();
|
||||
drop(listener);
|
||||
drop(acceptor);
|
||||
|
||||
// Drain connections
|
||||
connections.wait().await;
|
||||
@@ -213,11 +210,10 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
params_compat: true,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
allow_self_signed_compute: config.allow_self_signed_compute,
|
||||
},
|
||||
&user_info,
|
||||
config.wake_compute_retry_config,
|
||||
config.connect_to_compute_retry_config,
|
||||
&config.connect_to_compute,
|
||||
)
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
@@ -4,10 +4,11 @@ use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError};
|
||||
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
|
||||
use crate::conn::{Acceptor, TokioTcpAcceptor};
|
||||
use crate::control_plane::messages::{DatabaseInfo, KickSession};
|
||||
use crate::waiters::{self, Waiter, Waiters};
|
||||
|
||||
@@ -26,19 +27,15 @@ pub(crate) fn notify(psql_session_id: &str, msg: ComputeReady) -> Result<(), wai
|
||||
|
||||
/// Management API listener task.
|
||||
/// It spawns management response handlers needed for the console redirect auth flow.
|
||||
pub async fn task_main(listener: TcpListener) -> anyhow::Result<Infallible> {
|
||||
pub async fn task_main(acceptor: TokioTcpAcceptor) -> anyhow::Result<Infallible> {
|
||||
scopeguard::defer! {
|
||||
info!("mgmt has shut down");
|
||||
}
|
||||
|
||||
loop {
|
||||
let (socket, peer_addr) = listener.accept().await?;
|
||||
let (socket, peer_addr) = acceptor.accept().await?;
|
||||
info!("accepted connection from {peer_addr}");
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
.context("failed to set client socket option")?;
|
||||
|
||||
let span = info_span!("mgmt", peer = %peer_addr);
|
||||
|
||||
tokio::task::spawn(
|
||||
|
||||
@@ -10,13 +10,13 @@ pub mod client;
|
||||
pub(crate) mod errors;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::auth::backend::jwt::AuthRule;
|
||||
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
|
||||
use crate::auth::IpPattern;
|
||||
use crate::cache::project_info::ProjectInfoCacheImpl;
|
||||
use crate::cache::{Cached, TimedLru};
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
|
||||
use crate::intern::ProjectIdInt;
|
||||
@@ -73,12 +73,9 @@ impl NodeInfo {
|
||||
pub(crate) async fn connect(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
allow_self_signed_compute: bool,
|
||||
timeout: Duration,
|
||||
config: &ComputeConfig,
|
||||
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
|
||||
self.config
|
||||
.connect(ctx, allow_self_signed_compute, self.aux.clone(), timeout)
|
||||
.await
|
||||
self.config.connect(ctx, self.aux.clone(), config).await
|
||||
}
|
||||
|
||||
pub(crate) fn reuse_settings(&mut self, other: Self) {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::convert::Infallible;
|
||||
use std::net::TcpListener;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use anyhow::{anyhow, bail};
|
||||
@@ -14,6 +13,7 @@ use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
use utils::http::{RouterBuilder, RouterService};
|
||||
|
||||
use crate::conn::TokioTcpAcceptor;
|
||||
use crate::ext::{LockExt, TaskExt};
|
||||
use crate::jemalloc;
|
||||
|
||||
@@ -36,7 +36,7 @@ fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
|
||||
}
|
||||
|
||||
pub async fn task_main(
|
||||
http_listener: TcpListener,
|
||||
http_acceptor: TokioTcpAcceptor,
|
||||
metrics: AppMetrics,
|
||||
) -> anyhow::Result<Infallible> {
|
||||
scopeguard::defer! {
|
||||
@@ -45,7 +45,7 @@ pub async fn task_main(
|
||||
|
||||
let service = || RouterService::new(make_router(metrics).build()?);
|
||||
|
||||
hyper0::Server::from_tcp(http_listener)?
|
||||
hyper0::Server::from_tcp(http_acceptor.into_std()?)?
|
||||
.serve(service().map_err(|e| anyhow!(e))?)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -78,6 +78,7 @@ pub mod cancellation;
|
||||
pub mod compute;
|
||||
pub mod compute_ctl;
|
||||
pub mod config;
|
||||
pub mod conn;
|
||||
pub mod console_redirect_proxy;
|
||||
pub mod context;
|
||||
pub mod control_plane;
|
||||
@@ -89,7 +90,6 @@ pub mod jemalloc;
|
||||
pub mod logging;
|
||||
pub mod metrics;
|
||||
pub mod parse;
|
||||
pub mod postgres_rustls;
|
||||
pub mod protocol2;
|
||||
pub mod proxy;
|
||||
pub mod rate_limiter;
|
||||
@@ -99,6 +99,7 @@ pub mod scram;
|
||||
pub mod serverless;
|
||||
pub mod signals;
|
||||
pub mod stream;
|
||||
pub mod tls;
|
||||
pub mod types;
|
||||
pub mod url;
|
||||
pub mod usage_metrics;
|
||||
|
||||
@@ -6,7 +6,7 @@ use tracing::{debug, info, warn};
|
||||
use super::retry::ShouldRetryWakeCompute;
|
||||
use crate::auth::backend::ComputeCredentialKeys;
|
||||
use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
|
||||
use crate::config::RetryConfig;
|
||||
use crate::config::{ComputeConfig, RetryConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
@@ -19,8 +19,6 @@ use crate::proxy::retry::{retry_after, should_retry, CouldRetry};
|
||||
use crate::proxy::wake_compute::wake_compute;
|
||||
use crate::types::Host;
|
||||
|
||||
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
|
||||
|
||||
/// If we couldn't connect, a cached connection info might be to blame
|
||||
/// (e.g. the compute node's address might've changed at the wrong time).
|
||||
/// Invalidate the cache entry (if any) to prevent subsequent errors.
|
||||
@@ -49,7 +47,7 @@ pub(crate) trait ConnectMechanism {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
node_info: &control_plane::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
config: &ComputeConfig,
|
||||
) -> Result<Self::Connection, Self::ConnectError>;
|
||||
|
||||
fn update_connect_config(&self, conf: &mut compute::ConnCfg);
|
||||
@@ -73,9 +71,6 @@ pub(crate) struct TcpMechanism<'a> {
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
pub(crate) locks: &'static ApiLocks<Host>,
|
||||
|
||||
/// Whether we should accept self-signed certificates (for testing)
|
||||
pub(crate) allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -89,15 +84,11 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
node_info: &control_plane::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
config: &ComputeConfig,
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let host = node_info.config.get_host();
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
permit.release_result(
|
||||
node_info
|
||||
.connect(ctx, self.allow_self_signed_compute, timeout)
|
||||
.await,
|
||||
)
|
||||
permit.release_result(node_info.connect(ctx, config).await)
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
@@ -112,7 +103,7 @@ pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBac
|
||||
mechanism: &M,
|
||||
user_info: &B,
|
||||
wake_compute_retry_config: RetryConfig,
|
||||
connect_to_compute_retry_config: RetryConfig,
|
||||
compute: &ComputeConfig,
|
||||
) -> Result<M::Connection, M::Error>
|
||||
where
|
||||
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
|
||||
@@ -126,10 +117,7 @@ where
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
// try once
|
||||
let err = match mechanism
|
||||
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
|
||||
.await
|
||||
{
|
||||
let err = match mechanism.connect_once(ctx, &node_info, compute).await {
|
||||
Ok(res) => {
|
||||
ctx.success();
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
@@ -149,7 +137,7 @@ where
|
||||
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
|
||||
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
|
||||
// Do not need to retrieve a new node_info, just return the old one.
|
||||
if should_retry(&err, num_retries, connect_to_compute_retry_config) {
|
||||
if should_retry(&err, num_retries, compute.retry) {
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
@@ -179,10 +167,7 @@ where
|
||||
debug!("wake_compute success. attempting to connect");
|
||||
num_retries = 1;
|
||||
loop {
|
||||
match mechanism
|
||||
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
|
||||
.await
|
||||
{
|
||||
match mechanism.connect_once(ctx, &node_info, compute).await {
|
||||
Ok(res) => {
|
||||
ctx.success();
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
@@ -197,7 +182,7 @@ where
|
||||
return Ok(res);
|
||||
}
|
||||
Err(e) => {
|
||||
if !should_retry(&e, num_retries, connect_to_compute_retry_config) {
|
||||
if !should_retry(&e, num_retries, compute.retry) {
|
||||
// Don't log an error here, caller will print the error
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
@@ -213,7 +198,7 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
|
||||
let wait_duration = retry_after(num_retries, compute.retry);
|
||||
num_retries += 1;
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
|
||||
|
||||
@@ -8,12 +8,13 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::auth::endpoint_sni;
|
||||
use crate::config::{TlsConfig, PG_ALPN_PROTOCOL};
|
||||
use crate::config::TlsConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::ERR_INSECURE_CONNECTION;
|
||||
use crate::stream::{PqStream, Stream, StreamUpgradeError};
|
||||
use crate::tls::PG_ALPN_PROTOCOL;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub(crate) enum HandshakeError {
|
||||
|
||||
@@ -25,6 +25,7 @@ use self::connect_compute::{connect_to_compute, TcpMechanism};
|
||||
use self::passthrough::ProxyPassthrough;
|
||||
use crate::cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal};
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
|
||||
use crate::conn::{Acceptor, TokioTcpAcceptor};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
@@ -55,7 +56,7 @@ pub async fn run_until_cancelled<F: std::future::Future>(
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static auth::Backend<'static, ()>,
|
||||
listener: tokio::net::TcpListener,
|
||||
acceptor: TokioTcpAcceptor,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
@@ -64,15 +65,11 @@ pub async fn task_main(
|
||||
info!("proxy has shut down");
|
||||
}
|
||||
|
||||
// When set for the server socket, the keepalive setting
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
|
||||
while let Some(accept_result) =
|
||||
run_until_cancelled(listener.accept(), &cancellation_token).await
|
||||
run_until_cancelled(acceptor.accept(), &cancellation_token).await
|
||||
{
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
@@ -152,7 +149,7 @@ pub async fn task_main(
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass().await {
|
||||
match p.proxy_pass(&config.connect_to_compute).await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
|
||||
@@ -168,7 +165,7 @@ pub async fn task_main(
|
||||
|
||||
connections.close();
|
||||
cancellations.close();
|
||||
drop(listener);
|
||||
drop(acceptor);
|
||||
|
||||
// Drain connections
|
||||
connections.wait().await;
|
||||
@@ -348,12 +345,10 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
params_compat,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
// only used for console redirect testing.
|
||||
allow_self_signed_compute: false,
|
||||
},
|
||||
&user_info,
|
||||
config.wake_compute_retry_config,
|
||||
config.connect_to_compute_retry_config,
|
||||
&config.connect_to_compute,
|
||||
)
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
@@ -5,6 +5,7 @@ use utils::measured_stream::MeasuredStream;
|
||||
use super::copy_bidirectional::ErrorSource;
|
||||
use crate::cancellation;
|
||||
use crate::compute::PostgresConnection;
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
|
||||
use crate::stream::Stream;
|
||||
@@ -67,9 +68,17 @@ pub(crate) struct ProxyPassthrough<P, S> {
|
||||
}
|
||||
|
||||
impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
|
||||
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
|
||||
pub(crate) async fn proxy_pass(
|
||||
self,
|
||||
compute_config: &ComputeConfig,
|
||||
) -> Result<(), ErrorSource> {
|
||||
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
|
||||
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
|
||||
if let Err(err) = self
|
||||
.compute
|
||||
.cancel_closure
|
||||
.try_cancel_query(compute_config)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
}
|
||||
res
|
||||
|
||||
@@ -22,14 +22,16 @@ use super::*;
|
||||
use crate::auth::backend::{
|
||||
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned,
|
||||
};
|
||||
use crate::config::{CertResolver, RetryConfig};
|
||||
use crate::config::{ComputeConfig, RetryConfig};
|
||||
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
|
||||
use crate::control_plane::{
|
||||
self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo, NodeInfoCache,
|
||||
};
|
||||
use crate::error::ErrorKind;
|
||||
use crate::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::tls::client_config::compute_client_config_with_certs;
|
||||
use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::tls::server_config::CertResolver;
|
||||
use crate::types::{BranchId, EndpointId, ProjectId};
|
||||
use crate::{sasl, scram};
|
||||
|
||||
@@ -67,7 +69,7 @@ fn generate_certs(
|
||||
}
|
||||
|
||||
struct ClientConfig<'a> {
|
||||
config: rustls::ClientConfig,
|
||||
config: Arc<rustls::ClientConfig>,
|
||||
hostname: &'a str,
|
||||
}
|
||||
|
||||
@@ -110,16 +112,7 @@ fn generate_tls_config<'a>(
|
||||
};
|
||||
|
||||
let client_config = {
|
||||
let config =
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.context("ring should support the default protocol versions")?
|
||||
.with_root_certificates({
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add(ca)?;
|
||||
store
|
||||
})
|
||||
.with_no_client_auth();
|
||||
let config = Arc::new(compute_client_config_with_certs([ca]));
|
||||
|
||||
ClientConfig { config, hostname }
|
||||
};
|
||||
@@ -468,7 +461,7 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
&self,
|
||||
_ctx: &RequestContext,
|
||||
_node_info: &control_plane::CachedNodeInfo,
|
||||
_timeout: std::time::Duration,
|
||||
_config: &ComputeConfig,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let mut counter = self.counter.lock().unwrap();
|
||||
let action = self.sequence[*counter];
|
||||
@@ -576,6 +569,20 @@ fn helper_create_connect_info(
|
||||
user_info
|
||||
}
|
||||
|
||||
fn config() -> ComputeConfig {
|
||||
let retry = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
|
||||
ComputeConfig {
|
||||
retry,
|
||||
tls: Arc::new(compute_client_config_with_certs(std::iter::empty())),
|
||||
timeout: Duration::from_secs(2),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_success() {
|
||||
let _ = env_logger::try_init();
|
||||
@@ -583,12 +590,8 @@ async fn connect_to_compute_success() {
|
||||
let ctx = RequestContext::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let config = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
let config = config();
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -601,12 +604,8 @@ async fn connect_to_compute_retry() {
|
||||
let ctx = RequestContext::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let config = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
let config = config();
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -620,12 +619,8 @@ async fn connect_to_compute_non_retry_1() {
|
||||
let ctx = RequestContext::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let config = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
let config = config();
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
@@ -639,12 +634,8 @@ async fn connect_to_compute_non_retry_2() {
|
||||
let ctx = RequestContext::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let config = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
let config = config();
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -665,17 +656,13 @@ async fn connect_to_compute_non_retry_3() {
|
||||
max_retries: 1,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
let connect_to_compute_retry_config = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
let config = config();
|
||||
connect_to_compute(
|
||||
&ctx,
|
||||
&mechanism,
|
||||
&user_info,
|
||||
wake_compute_retry_config,
|
||||
connect_to_compute_retry_config,
|
||||
&config,
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
@@ -690,12 +677,8 @@ async fn wake_retry() {
|
||||
let ctx = RequestContext::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let config = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
let config = config();
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -709,12 +692,8 @@ async fn wake_non_retry() {
|
||||
let ctx = RequestContext::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let config = RetryConfig {
|
||||
base_delay: Duration::from_secs(1),
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
let config = config();
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
|
||||
@@ -12,6 +12,7 @@ use uuid::Uuid;
|
||||
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
use crate::cache::project_info::ProjectInfoCache;
|
||||
use crate::cancellation::{CancelMap, CancellationHandler};
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::intern::{ProjectIdInt, RoleNameInt};
|
||||
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
|
||||
|
||||
@@ -39,6 +40,27 @@ pub(crate) enum Notification {
|
||||
AllowedIpsUpdate {
|
||||
allowed_ips_update: AllowedIpsUpdate,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/block_public_or_vpc_access_updated",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
)]
|
||||
BlockPublicOrVpcAccessUpdated {
|
||||
block_public_or_vpc_access_updated: BlockPublicOrVpcAccessUpdated,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/allowed_vpc_endpoints_updated_for_org",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
)]
|
||||
AllowedVpcEndpointsUpdatedForOrg {
|
||||
allowed_vpc_endpoints_updated_for_org: AllowedVpcEndpointsUpdatedForOrg,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/allowed_vpc_endpoints_updated_for_projects",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
)]
|
||||
AllowedVpcEndpointsUpdatedForProjects {
|
||||
allowed_vpc_endpoints_updated_for_projects: AllowedVpcEndpointsUpdatedForProjects,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/password_updated",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
@@ -51,6 +73,24 @@ pub(crate) enum Notification {
|
||||
pub(crate) struct AllowedIpsUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct BlockPublicOrVpcAccessUpdated {
|
||||
project_id: ProjectIdInt,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct AllowedVpcEndpointsUpdatedForOrg {
|
||||
// TODO: change type once the implementation is more fully fledged.
|
||||
// See e.g. https://github.com/neondatabase/neon/pull/10073.
|
||||
account_id: ProjectIdInt,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct AllowedVpcEndpointsUpdatedForProjects {
|
||||
project_ids: Vec<ProjectIdInt>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct PasswordUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
@@ -164,7 +204,11 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
}
|
||||
}
|
||||
}
|
||||
Notification::AllowedIpsUpdate { .. } | Notification::PasswordUpdate { .. } => {
|
||||
Notification::AllowedIpsUpdate { .. }
|
||||
| Notification::PasswordUpdate { .. }
|
||||
| Notification::BlockPublicOrVpcAccessUpdated { .. }
|
||||
| Notification::AllowedVpcEndpointsUpdatedForOrg { .. }
|
||||
| Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
|
||||
invalidate_cache(self.cache.clone(), msg.clone());
|
||||
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
|
||||
Metrics::get()
|
||||
@@ -177,6 +221,8 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
.redis_events_count
|
||||
.inc(RedisEventsCount::PasswordUpdate);
|
||||
}
|
||||
// TODO: add additional metrics for the other event types.
|
||||
|
||||
// It might happen that the invalid entry is on the way to be cached.
|
||||
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
|
||||
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
|
||||
@@ -203,6 +249,15 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
|
||||
password_update.role_name,
|
||||
),
|
||||
Notification::Cancel(_) => unreachable!("cancel message should be handled separately"),
|
||||
Notification::BlockPublicOrVpcAccessUpdated { .. } => {
|
||||
// https://github.com/neondatabase/neon/pull/10073
|
||||
}
|
||||
Notification::AllowedVpcEndpointsUpdatedForOrg { .. } => {
|
||||
// https://github.com/neondatabase/neon/pull/10073
|
||||
}
|
||||
Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
|
||||
// https://github.com/neondatabase/neon/pull/10073
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,6 +304,7 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
|
||||
/// Handle console's invalidation messages.
|
||||
#[tracing::instrument(name = "redis_notifications", skip_all)]
|
||||
pub async fn task_main<C>(
|
||||
config: &'static ProxyConfig,
|
||||
redis: ConnectionWithCredentialsProvider,
|
||||
cache: Arc<C>,
|
||||
cancel_map: CancelMap,
|
||||
@@ -258,6 +314,7 @@ where
|
||||
C: ProjectInfoCache + Send + Sync + 'static,
|
||||
{
|
||||
let cancellation_handler = Arc::new(CancellationHandler::<()>::new(
|
||||
&config.connect_to_compute,
|
||||
cancel_map,
|
||||
crate::metrics::CancellationSource::FromRedis,
|
||||
));
|
||||
|
||||
@@ -50,6 +50,12 @@ impl<S: AsyncWrite + Unpin> SaslStream<'_, S> {
|
||||
self.stream.write_message(&msg.to_reply()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Queue a SASL message for the client.
|
||||
fn send_noflush(&mut self, msg: &ServerMessage<&str>) -> io::Result<()> {
|
||||
self.stream.write_message_noflush(&msg.to_reply())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// SASL authentication outcome.
|
||||
@@ -85,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> SaslStream<'_, S> {
|
||||
continue;
|
||||
}
|
||||
Step::Success(result, reply) => {
|
||||
self.send(&ServerMessage::Final(&reply)).await?;
|
||||
self.send_noflush(&ServerMessage::Final(&reply))?;
|
||||
Outcome::Success(result)
|
||||
}
|
||||
Step::Failure(reason) => Outcome::Failure(reason),
|
||||
|
||||
@@ -13,7 +13,6 @@ use super::secret::ServerSecret;
|
||||
use super::signature::SignatureBuilder;
|
||||
use super::threadpool::ThreadPool;
|
||||
use super::ScramKey;
|
||||
use crate::config;
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::sasl::{self, ChannelBinding, Error as SaslError};
|
||||
|
||||
@@ -59,14 +58,14 @@ enum ExchangeState {
|
||||
pub(crate) struct Exchange<'a> {
|
||||
state: ExchangeState,
|
||||
secret: &'a ServerSecret,
|
||||
tls_server_end_point: config::TlsServerEndPoint,
|
||||
tls_server_end_point: crate::tls::TlsServerEndPoint,
|
||||
}
|
||||
|
||||
impl<'a> Exchange<'a> {
|
||||
pub(crate) fn new(
|
||||
secret: &'a ServerSecret,
|
||||
nonce: fn() -> [u8; SCRAM_RAW_NONCE_LEN],
|
||||
tls_server_end_point: config::TlsServerEndPoint,
|
||||
tls_server_end_point: crate::tls::TlsServerEndPoint,
|
||||
) -> Self {
|
||||
Self {
|
||||
state: ExchangeState::Initial(SaslInitial { nonce }),
|
||||
@@ -120,7 +119,7 @@ impl SaslInitial {
|
||||
fn transition(
|
||||
&self,
|
||||
secret: &ServerSecret,
|
||||
tls_server_end_point: &config::TlsServerEndPoint,
|
||||
tls_server_end_point: &crate::tls::TlsServerEndPoint,
|
||||
input: &str,
|
||||
) -> sasl::Result<sasl::Step<SaslSentInner, Infallible>> {
|
||||
let client_first_message = ClientFirstMessage::parse(input)
|
||||
@@ -155,7 +154,7 @@ impl SaslSentInner {
|
||||
fn transition(
|
||||
&self,
|
||||
secret: &ServerSecret,
|
||||
tls_server_end_point: &config::TlsServerEndPoint,
|
||||
tls_server_end_point: &crate::tls::TlsServerEndPoint,
|
||||
input: &str,
|
||||
) -> sasl::Result<sasl::Step<Infallible, super::ScramKey>> {
|
||||
let Self {
|
||||
@@ -168,8 +167,8 @@ impl SaslSentInner {
|
||||
.ok_or(SaslError::BadClientMessage("invalid client-final-message"))?;
|
||||
|
||||
let channel_binding = cbind_flag.encode(|_| match tls_server_end_point {
|
||||
config::TlsServerEndPoint::Sha256(x) => Ok(x),
|
||||
config::TlsServerEndPoint::Undefined => Err(SaslError::MissingBinding),
|
||||
crate::tls::TlsServerEndPoint::Sha256(x) => Ok(x),
|
||||
crate::tls::TlsServerEndPoint::Undefined => Err(SaslError::MissingBinding),
|
||||
})?;
|
||||
|
||||
// This might've been caused by a MITM attack
|
||||
|
||||
@@ -77,11 +77,8 @@ mod tests {
|
||||
const NONCE: [u8; 18] = [
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
|
||||
];
|
||||
let mut exchange = Exchange::new(
|
||||
&secret,
|
||||
|| NONCE,
|
||||
crate::config::TlsServerEndPoint::Undefined,
|
||||
);
|
||||
let mut exchange =
|
||||
Exchange::new(&secret, || NONCE, crate::tls::TlsServerEndPoint::Undefined);
|
||||
|
||||
let client_first = "n,,n=user,r=rOprNGfwEbeRWgbNEkqO";
|
||||
let client_final = "c=biws,r=rOprNGfwEbeRWgbNEkqOAQIDBAUGBwgJCgsMDQ4PEBES,p=rw1r5Kph5ThxmaUBC2GAQ6MfXbPnNkFiTIvdb/Rear0=";
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::compute;
|
||||
use crate::compute_ctl::{
|
||||
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
|
||||
};
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::config::{ComputeConfig, ProxyConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
|
||||
@@ -196,7 +196,7 @@ impl PoolingBackend {
|
||||
},
|
||||
&backend,
|
||||
self.config.wake_compute_retry_config,
|
||||
self.config.connect_to_compute_retry_config,
|
||||
&self.config.connect_to_compute,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -237,7 +237,7 @@ impl PoolingBackend {
|
||||
},
|
||||
&backend,
|
||||
self.config.wake_compute_retry_config,
|
||||
self.config.connect_to_compute_retry_config,
|
||||
&self.config.connect_to_compute,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -502,7 +502,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
node_info: &CachedNodeInfo,
|
||||
timeout: Duration,
|
||||
compute_config: &ComputeConfig,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let host = node_info.config.get_host();
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
@@ -511,7 +511,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
let config = config
|
||||
.user(&self.conn_info.user_info.user)
|
||||
.dbname(&self.conn_info.dbname)
|
||||
.connect_timeout(timeout);
|
||||
.connect_timeout(compute_config.timeout);
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let res = config.connect(postgres_client::NoTls).await;
|
||||
@@ -552,7 +552,7 @@ impl ConnectMechanism for HyperMechanism {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
node_info: &CachedNodeInfo,
|
||||
timeout: Duration,
|
||||
config: &ComputeConfig,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let host = node_info.config.get_host();
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
@@ -560,7 +560,7 @@ impl ConnectMechanism for HyperMechanism {
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
|
||||
let port = node_info.config.get_port();
|
||||
let res = connect_http2(&host, port, timeout).await;
|
||||
let res = connect_http2(&host, port, config.timeout).await;
|
||||
drop(pause);
|
||||
let (client, connection) = permit.release_result(res)?;
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::timeout;
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -45,6 +45,7 @@ use utils::http::error::ApiError;
|
||||
|
||||
use crate::cancellation::CancellationHandlerMain;
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::conn::{Acceptor, TokioTcpAcceptor};
|
||||
use crate::context::RequestContext;
|
||||
use crate::ext::TaskExt;
|
||||
use crate::metrics::Metrics;
|
||||
@@ -59,7 +60,7 @@ pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
ws_listener: TcpListener,
|
||||
ws_acceptor: TokioTcpAcceptor,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
@@ -134,7 +135,7 @@ pub async fn task_main(
|
||||
connections.close(); // allows `connections.wait to complete`
|
||||
|
||||
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
while let Some(res) = run_until_cancelled(ws_listener.accept(), &cancellation_token).await {
|
||||
while let Some(res) = run_until_cancelled(ws_acceptor.accept(), &cancellation_token).await {
|
||||
let (conn, peer_addr) = res.context("could not accept TCP stream")?;
|
||||
if let Err(e) = conn.set_nodelay(true) {
|
||||
tracing::error!("could not set nodelay: {e}");
|
||||
|
||||
@@ -168,7 +168,7 @@ pub(crate) async fn serve_websocket(
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().await {
|
||||
match p.proxy_pass(&config.connect_to_compute).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(ErrorSource::Client(err)) => Err(err).context("client"),
|
||||
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),
|
||||
|
||||
@@ -11,9 +11,9 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::config::TlsServerEndPoint;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::metrics::Metrics;
|
||||
use crate::tls::TlsServerEndPoint;
|
||||
|
||||
/// Stream wrapper which implements libpq's protocol.
|
||||
///
|
||||
|
||||
42
proxy/src/tls/client_config.rs
Normal file
42
proxy/src/tls/client_config.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use rustls::crypto::ring;
|
||||
|
||||
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
let der_certs = rustls_native_certs::load_native_certs();
|
||||
|
||||
if !der_certs.errors.is_empty() {
|
||||
bail!("could not parse certificates: {:?}", der_certs.errors);
|
||||
}
|
||||
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add_parsable_certificates(der_certs.certs);
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
|
||||
/// Loads the root certificates and constructs a client config suitable for connecting to the neon compute.
|
||||
/// This function is blocking.
|
||||
pub fn compute_client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
|
||||
Ok(
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.with_root_certificates(load_certs()?)
|
||||
.with_no_client_auth(),
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn compute_client_config_with_certs(
|
||||
certs: impl IntoIterator<Item = rustls::pki_types::CertificateDer<'static>>,
|
||||
) -> rustls::ClientConfig {
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add_parsable_certificates(certs);
|
||||
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.with_root_certificates(store)
|
||||
.with_no_client_auth()
|
||||
}
|
||||
72
proxy/src/tls/mod.rs
Normal file
72
proxy/src/tls/mod.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
pub mod client_config;
|
||||
pub mod postgres_rustls;
|
||||
pub mod server_config;
|
||||
|
||||
use anyhow::Context;
|
||||
use rustls::pki_types::CertificateDer;
|
||||
use sha2::{Digest, Sha256};
|
||||
use tracing::{error, info};
|
||||
use x509_parser::oid_registry;
|
||||
|
||||
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L159>
|
||||
pub const PG_ALPN_PROTOCOL: &[u8] = b"postgresql";
|
||||
|
||||
/// Channel binding parameter
|
||||
///
|
||||
/// <https://www.rfc-editor.org/rfc/rfc5929#section-4>
|
||||
/// Description: The hash of the TLS server's certificate as it
|
||||
/// appears, octet for octet, in the server's Certificate message. Note
|
||||
/// that the Certificate message contains a certificate_list, in which
|
||||
/// the first element is the server's certificate.
|
||||
///
|
||||
/// The hash function is to be selected as follows:
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function, and that hash function is either MD5 or SHA-1, then use SHA-256;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function and that hash function neither MD5 nor SHA-1, then use
|
||||
/// the hash function associated with the certificate's
|
||||
/// signatureAlgorithm;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses no hash functions or
|
||||
/// uses multiple hash functions, then this channel binding type's
|
||||
/// channel bindings are undefined at this time (updates to is channel
|
||||
/// binding type may occur to address this issue if it ever arises).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum TlsServerEndPoint {
|
||||
Sha256([u8; 32]),
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl TlsServerEndPoint {
|
||||
pub fn new(cert: &CertificateDer<'_>) -> anyhow::Result<Self> {
|
||||
let sha256_oids = [
|
||||
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
|
||||
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
|
||||
oid_registry::OID_PKCS1_SHA256WITHRSA,
|
||||
];
|
||||
|
||||
let pem = x509_parser::parse_x509_certificate(cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
info!(subject = %pem.subject, "parsing TLS certificate");
|
||||
|
||||
let reg = oid_registry::OidRegistry::default().with_all_crypto();
|
||||
let oid = pem.signature_algorithm.oid();
|
||||
let alg = reg.get(oid);
|
||||
if sha256_oids.contains(oid) {
|
||||
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
|
||||
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
|
||||
Ok(Self::Sha256(tls_server_end_point))
|
||||
} else {
|
||||
error!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), "unknown channel binding");
|
||||
Ok(Self::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn supported(&self) -> bool {
|
||||
!matches!(self, TlsServerEndPoint::Undefined)
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,7 @@ mod private {
|
||||
use tokio_rustls::client::TlsStream;
|
||||
use tokio_rustls::TlsConnector;
|
||||
|
||||
use crate::config::TlsServerEndPoint;
|
||||
use crate::tls::TlsServerEndPoint;
|
||||
|
||||
pub struct TlsConnectFuture<S> {
|
||||
inner: tokio_rustls::Connect<S>,
|
||||
@@ -126,16 +126,14 @@ mod private {
|
||||
/// That way you can connect to PostgreSQL using `rustls` as the TLS stack.
|
||||
#[derive(Clone)]
|
||||
pub struct MakeRustlsConnect {
|
||||
config: Arc<ClientConfig>,
|
||||
pub config: Arc<ClientConfig>,
|
||||
}
|
||||
|
||||
impl MakeRustlsConnect {
|
||||
/// Creates a new `MakeRustlsConnect` from the provided `ClientConfig`.
|
||||
#[must_use]
|
||||
pub fn new(config: ClientConfig) -> Self {
|
||||
Self {
|
||||
config: Arc::new(config),
|
||||
}
|
||||
pub fn new(config: Arc<ClientConfig>) -> Self {
|
||||
Self { config }
|
||||
}
|
||||
}
|
||||
|
||||
218
proxy/src/tls/server_config.rs
Normal file
218
proxy/src/tls/server_config.rs
Normal file
@@ -0,0 +1,218 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use itertools::Itertools;
|
||||
use rustls::crypto::ring::{self, sign};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
|
||||
use super::{TlsServerEndPoint, PG_ALPN_PROTOCOL};
|
||||
|
||||
pub struct TlsConfig {
|
||||
pub config: Arc<rustls::ServerConfig>,
|
||||
pub common_names: HashSet<String>,
|
||||
pub cert_resolver: Arc<CertResolver>,
|
||||
}
|
||||
|
||||
impl TlsConfig {
|
||||
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
|
||||
self.config.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Configure TLS for the main endpoint.
|
||||
pub fn configure_tls(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
certs_dir: Option<&String>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
cert_resolver.add_cert_path(key_path, cert_path, true)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
for entry in std::fs::read_dir(certs_dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
// file names aligned with default cert-manager names
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver.add_cert_path(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let common_names = cert_resolver.get_common_names();
|
||||
|
||||
let cert_resolver = Arc::new(cert_resolver);
|
||||
|
||||
// allow TLS 1.2 to be compatible with older client libraries
|
||||
let mut config =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
|
||||
.context("ring should support TLS1.2 and TLS1.3")?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver.clone());
|
||||
|
||||
config.alpn_protocols = vec![PG_ALPN_PROTOCOL.to_vec()];
|
||||
|
||||
if allow_tls_keylogfile {
|
||||
// KeyLogFile will check for the SSLKEYLOGFILE environment variable.
|
||||
config.key_log = Arc::new(rustls::KeyLogFile::new());
|
||||
}
|
||||
|
||||
Ok(TlsConfig {
|
||||
config: Arc::new(config),
|
||||
common_names,
|
||||
cert_resolver,
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn add_cert_path(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
|
||||
})?
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
}
|
||||
|
||||
pub fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
let pem = x509_parser::parse_x509_certificate(first_cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
let common_name = pem.subject().to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some((cert.clone(), tls_server_end_point));
|
||||
}
|
||||
|
||||
self.certs.insert(common_name, (cert, tls_server_end_point));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_common_names(&self) -> HashSet<String> {
|
||||
self.certs.keys().map(|s| s.to_string()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
fn resolve(
|
||||
&self,
|
||||
client_hello: rustls::server::ClientHello<'_>,
|
||||
) -> Option<Arc<rustls::sign::CertifiedKey>> {
|
||||
self.resolve(client_hello.server_name()).map(|x| x.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn resolve(
|
||||
&self,
|
||||
server_name: Option<&str>,
|
||||
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
|
||||
// loop here and cut off more and more subdomains until we find
|
||||
// a match to get a proper wildcard support. OTOH, we now do not
|
||||
// use nested domains, so keep this simple for now.
|
||||
//
|
||||
// With the current coding foo.com will match *.foo.com and that
|
||||
// repeats behavior of the old code.
|
||||
if let Some(mut sni_name) = server_name {
|
||||
loop {
|
||||
if let Some(cert) = self.certs.get(sni_name) {
|
||||
return Some(cert.clone());
|
||||
}
|
||||
if let Some((_, rest)) = sni_name.split_once('.') {
|
||||
sni_name = rest;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No SNI, use the default certificate, otherwise we can't get to
|
||||
// options parameter which can be used to set endpoint name too.
|
||||
// That means that non-SNI flow will not work for CNAME domains in
|
||||
// verify-full mode.
|
||||
//
|
||||
// If that will be a problem we can:
|
||||
//
|
||||
// a) Instead of multi-cert approach use single cert with extra
|
||||
// domains listed in Subject Alternative Name (SAN).
|
||||
// b) Deploy separate proxy instances for extra domains.
|
||||
self.default.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
benchmarking = []
|
||||
|
||||
[dependencies]
|
||||
async-stream.workspace = true
|
||||
@@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] }
|
||||
[[bench]]
|
||||
name = "receive_wal"
|
||||
harness = false
|
||||
required-features = ["benchmarking"]
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
//! WAL ingestion benchmarks.
|
||||
|
||||
#[path = "benchutils.rs"]
|
||||
mod benchutils;
|
||||
|
||||
use std::io::Write as _;
|
||||
|
||||
use benchutils::Env;
|
||||
use bytes::BytesMut;
|
||||
use camino_tempfile::tempfile;
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
@@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
};
|
||||
use safekeeper::test_utils::Env;
|
||||
use tokio::io::AsyncWriteExt as _;
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) {
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
|
||||
|
||||
// Set up the Safekeeper.
|
||||
let env = Env::new(fsync)?;
|
||||
let mut safekeeper =
|
||||
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
|
||||
let mut safekeeper = runtime.block_on(env.make_safekeeper(
|
||||
NodeId(1),
|
||||
TenantTimelineId::generate(),
|
||||
Lsn(0),
|
||||
))?;
|
||||
|
||||
b.iter_batched_ref(
|
||||
// Pre-construct WAL records and requests. Criterion will batch them.
|
||||
@@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
|
||||
|
||||
let env = Env::new(fsync)?;
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
|
||||
let walgen =
|
||||
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));
|
||||
|
||||
// Create buffered channels that can fit all requests, to avoid blocking on channels.
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
|
||||
@@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
// TODO: WalAcceptor doesn't actually need a full timeline, only
|
||||
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
@@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
|
||||
|
||||
// Construct and spawn the WalAcceptor task.
|
||||
let env = Env::new(fsync)?;
|
||||
@@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
|
||||
runtime.block_on(async {
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
|
||||
@@ -564,7 +564,7 @@ pub fn make_router(
|
||||
if conf.http_auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
const ALLOWLIST_ROUTES: &[&str] =
|
||||
&["/v1/status", "/metrics", "/profile/cpu", "profile/heap"];
|
||||
&["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
|
||||
if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -43,6 +43,9 @@ pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
#[cfg(any(test, feature = "benchmarking"))]
|
||||
pub mod test_utils;
|
||||
|
||||
mod timelines_global_map;
|
||||
use std::sync::Arc;
|
||||
pub use timelines_global_map::GlobalTimelines;
|
||||
|
||||
@@ -94,9 +94,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
}
|
||||
}
|
||||
|
||||
let max_next_record_lsn = match max_next_record_lsn {
|
||||
Some(lsn) => lsn,
|
||||
None => { continue; }
|
||||
};
|
||||
|
||||
let batch = InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: max_next_record_lsn
|
||||
next_record_lsn: Some(max_next_record_lsn),
|
||||
};
|
||||
|
||||
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
|
||||
|
||||
@@ -1,18 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
|
||||
use crate::state::{TimelinePersistentState, TimelineState};
|
||||
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::remote_timeline_path;
|
||||
use crate::{control_file, wal_storage, SafeKeeperConf};
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use safekeeper::rate_limit::RateLimiter;
|
||||
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
|
||||
use safekeeper::state::{TimelinePersistentState, TimelineState};
|
||||
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
|
||||
use safekeeper::timelines_set::TimelinesSet;
|
||||
use safekeeper::wal_backup::remote_timeline_path;
|
||||
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
|
||||
use tokio::fs::create_dir_all;
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
|
||||
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
|
||||
pub struct Env {
|
||||
/// Whether to enable fsync.
|
||||
pub fsync: bool,
|
||||
@@ -21,7 +21,7 @@ pub struct Env {
|
||||
}
|
||||
|
||||
impl Env {
|
||||
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
|
||||
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
|
||||
/// enable fsyncing.
|
||||
pub fn new(fsync: bool) -> anyhow::Result<Self> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
@@ -47,6 +47,7 @@ impl Env {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
|
||||
let conf = self.make_conf(node_id);
|
||||
|
||||
@@ -67,9 +68,9 @@ impl Env {
|
||||
safekeeper
|
||||
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(0),
|
||||
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
|
||||
timeline_start_lsn: Lsn(0),
|
||||
start_streaming_at: start_lsn,
|
||||
term_history: TermHistory(vec![(1, start_lsn).into()]),
|
||||
timeline_start_lsn: start_lsn,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -82,12 +83,13 @@ impl Env {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let conf = Arc::new(self.make_conf(node_id));
|
||||
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
|
||||
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
|
||||
|
||||
let timeline = Timeline::new(
|
||||
@@ -18,7 +18,7 @@ impl DiskWalProposer {
|
||||
internal_available_lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
disk: BlockStorage::new(),
|
||||
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
|
||||
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -43,13 +43,13 @@ scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
diesel = { version = "2.1.4", features = [
|
||||
diesel = { version = "2.2.6", features = [
|
||||
"serde_json",
|
||||
"postgres",
|
||||
"r2d2",
|
||||
"chrono",
|
||||
] }
|
||||
diesel_migrations = { version = "2.1.0" }
|
||||
diesel_migrations = { version = "2.2.0" }
|
||||
r2d2 = { version = "0.8.10" }
|
||||
|
||||
utils = { path = "../libs/utils/" }
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::borrow::Cow;
|
||||
use std::error::Error as _;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
@@ -6,6 +7,7 @@ use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -28,6 +30,9 @@ struct UnshardedComputeHookTenant {
|
||||
// Which node is this tenant attached to
|
||||
node_id: NodeId,
|
||||
|
||||
// The tenant's preferred AZ, so that we may pass this on to the control plane
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
|
||||
// Must hold this lock to send a notification.
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
|
||||
}
|
||||
@@ -36,6 +41,9 @@ struct ShardedComputeHookTenant {
|
||||
shard_count: ShardCount,
|
||||
shards: Vec<(ShardNumber, NodeId)>,
|
||||
|
||||
// The tenant's preferred AZ, so that we may pass this on to the control plane
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
|
||||
// Must hold this lock to send a notification. The contents represent
|
||||
// the last successfully sent notification, and are used to coalesce multiple
|
||||
// updates by only sending when there is a chance since our last successful send.
|
||||
@@ -64,17 +72,24 @@ enum ComputeHookTenant {
|
||||
|
||||
impl ComputeHookTenant {
|
||||
/// Construct with at least one shard's information
|
||||
fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
|
||||
fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
stripe_size: ShardStripeSize,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
node_id: NodeId,
|
||||
) -> Self {
|
||||
if tenant_shard_id.shard_count.count() > 1 {
|
||||
Self::Sharded(ShardedComputeHookTenant {
|
||||
shards: vec![(tenant_shard_id.shard_number, node_id)],
|
||||
stripe_size,
|
||||
shard_count: tenant_shard_id.shard_count,
|
||||
preferred_az,
|
||||
send_lock: Arc::default(),
|
||||
})
|
||||
} else {
|
||||
Self::Unsharded(UnshardedComputeHookTenant {
|
||||
node_id,
|
||||
preferred_az,
|
||||
send_lock: Arc::default(),
|
||||
})
|
||||
}
|
||||
@@ -120,15 +135,20 @@ impl ComputeHookTenant {
|
||||
|
||||
/// Set one shard's location. If stripe size or shard count have changed, Self is reset
|
||||
/// and drops existing content.
|
||||
fn update(
|
||||
&mut self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
stripe_size: ShardStripeSize,
|
||||
node_id: NodeId,
|
||||
) {
|
||||
fn update(&mut self, shard_update: ShardUpdate) {
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let node_id = shard_update.node_id;
|
||||
let stripe_size = shard_update.stripe_size;
|
||||
let preferred_az = shard_update.preferred_az;
|
||||
|
||||
match self {
|
||||
Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
|
||||
unsharded_tenant.node_id = node_id
|
||||
unsharded_tenant.node_id = node_id;
|
||||
if unsharded_tenant.preferred_az.as_ref()
|
||||
!= preferred_az.as_ref().map(|az| az.as_ref())
|
||||
{
|
||||
unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
|
||||
}
|
||||
}
|
||||
Self::Sharded(sharded_tenant)
|
||||
if sharded_tenant.stripe_size == stripe_size
|
||||
@@ -146,10 +166,21 @@ impl ComputeHookTenant {
|
||||
.push((tenant_shard_id.shard_number, node_id));
|
||||
sharded_tenant.shards.sort_by_key(|s| s.0)
|
||||
}
|
||||
|
||||
if sharded_tenant.preferred_az.as_ref()
|
||||
!= preferred_az.as_ref().map(|az| az.as_ref())
|
||||
{
|
||||
sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// Shard count changed: reset struct.
|
||||
*self = Self::new(tenant_shard_id, stripe_size, node_id);
|
||||
*self = Self::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
preferred_az.map(|az| az.into_owned()),
|
||||
node_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -165,6 +196,7 @@ struct ComputeHookNotifyRequestShard {
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
struct ComputeHookNotifyRequest {
|
||||
tenant_id: TenantId,
|
||||
preferred_az: Option<String>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
shards: Vec<ComputeHookNotifyRequestShard>,
|
||||
}
|
||||
@@ -238,6 +270,10 @@ impl ComputeHookTenant {
|
||||
node_id: unsharded_tenant.node_id,
|
||||
}],
|
||||
stripe_size: None,
|
||||
preferred_az: unsharded_tenant
|
||||
.preferred_az
|
||||
.as_ref()
|
||||
.map(|az| az.0.clone()),
|
||||
}),
|
||||
Self::Sharded(sharded_tenant)
|
||||
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
|
||||
@@ -253,6 +289,7 @@ impl ComputeHookTenant {
|
||||
})
|
||||
.collect(),
|
||||
stripe_size: Some(sharded_tenant.stripe_size),
|
||||
preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
|
||||
})
|
||||
}
|
||||
Self::Sharded(sharded_tenant) => {
|
||||
@@ -313,6 +350,17 @@ pub(super) struct ComputeHook {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
/// Callers may give us a list of these when asking us to send a bulk batch
|
||||
/// of notifications in the background. This is a 'notification' in the sense of
|
||||
/// other code notifying us of a shard's status, rather than being the final notification
|
||||
/// that we send upwards to the control plane for the whole tenant.
|
||||
pub(crate) struct ShardUpdate<'a> {
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) node_id: NodeId,
|
||||
pub(crate) stripe_size: ShardStripeSize,
|
||||
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
pub(super) fn new(config: Config) -> Self {
|
||||
let authorization_header = config
|
||||
@@ -363,6 +411,7 @@ impl ComputeHook {
|
||||
tenant_id,
|
||||
shards,
|
||||
stripe_size,
|
||||
preferred_az: _preferred_az,
|
||||
} = reconfigure_request;
|
||||
|
||||
let compute_pageservers = shards
|
||||
@@ -503,24 +552,30 @@ impl ComputeHook {
|
||||
}
|
||||
|
||||
/// Synchronous phase: update the per-tenant state for the next intended notification
|
||||
fn notify_prepare(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> MaybeSendResult {
|
||||
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
|
||||
let mut state_locked = self.state.lock().unwrap();
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
|
||||
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Vacant(e) => {
|
||||
let ShardUpdate {
|
||||
tenant_shard_id,
|
||||
node_id,
|
||||
stripe_size,
|
||||
preferred_az,
|
||||
} = shard_update;
|
||||
e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
preferred_az.map(|az| az.into_owned()),
|
||||
node_id,
|
||||
))
|
||||
}
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant.update(shard_update);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
@@ -608,13 +663,14 @@ impl ComputeHook {
|
||||
/// if something failed.
|
||||
pub(super) fn notify_background(
|
||||
self: &Arc<Self>,
|
||||
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
|
||||
notifications: Vec<ShardUpdate>,
|
||||
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
|
||||
cancel: &CancellationToken,
|
||||
) {
|
||||
let mut maybe_sends = Vec::new();
|
||||
for (tenant_shard_id, node_id, stripe_size) in notifications {
|
||||
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
|
||||
for shard_update in notifications {
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let maybe_send_result = self.notify_prepare(shard_update);
|
||||
maybe_sends.push((tenant_shard_id, maybe_send_result))
|
||||
}
|
||||
|
||||
@@ -678,15 +734,14 @@ impl ComputeHook {
|
||||
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
|
||||
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
|
||||
/// the proper pageserver nodes for a tenant.
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify(
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify<'a>(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
stripe_size: ShardStripeSize,
|
||||
shard_update: ShardUpdate<'a>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let maybe_send_result = self.notify_prepare(shard_update);
|
||||
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
|
||||
.await
|
||||
}
|
||||
@@ -739,6 +794,7 @@ pub(crate) mod tests {
|
||||
shard_number: ShardNumber(0),
|
||||
},
|
||||
ShardStripeSize(12345),
|
||||
None,
|
||||
NodeId(1),
|
||||
);
|
||||
|
||||
@@ -765,30 +821,32 @@ pub(crate) mod tests {
|
||||
// Writing the first shard of a multi-sharded situation (i.e. in a split)
|
||||
// resets the tenant state and puts it in an non-notifying state (need to
|
||||
// see all shards)
|
||||
tenant_state.update(
|
||||
TenantShardId {
|
||||
tenant_state.update(ShardUpdate {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount::new(2),
|
||||
shard_number: ShardNumber(1),
|
||||
},
|
||||
ShardStripeSize(32768),
|
||||
NodeId(1),
|
||||
);
|
||||
stripe_size: ShardStripeSize(32768),
|
||||
preferred_az: None,
|
||||
node_id: NodeId(1),
|
||||
});
|
||||
assert!(matches!(
|
||||
tenant_state.maybe_send(tenant_id, None),
|
||||
MaybeSendResult::Noop
|
||||
));
|
||||
|
||||
// Writing the second shard makes it ready to notify
|
||||
tenant_state.update(
|
||||
TenantShardId {
|
||||
tenant_state.update(ShardUpdate {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount::new(2),
|
||||
shard_number: ShardNumber(0),
|
||||
},
|
||||
ShardStripeSize(32768),
|
||||
NodeId(1),
|
||||
);
|
||||
stripe_size: ShardStripeSize(32768),
|
||||
preferred_az: None,
|
||||
node_id: NodeId(1),
|
||||
});
|
||||
|
||||
let send_result = tenant_state.maybe_send(tenant_id, None);
|
||||
let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
|
||||
|
||||
@@ -11,6 +11,7 @@ use diesel::Connection;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::controller_api::MetadataHealthRecord;
|
||||
use pageserver_api::controller_api::SafekeeperDescribeResponse;
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
use pageserver_api::models::TenantConfig;
|
||||
@@ -1241,6 +1242,18 @@ impl SafekeeperPersistence {
|
||||
availability_zone_id: &self.availability_zone_id,
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse {
|
||||
// omit the `active` flag on purpose: it is deprecated.
|
||||
SafekeeperDescribeResponse {
|
||||
id: NodeId(self.id as u64),
|
||||
region_id: self.region_id.clone(),
|
||||
version: self.version,
|
||||
host: self.host.clone(),
|
||||
port: self.port,
|
||||
http_port: self.http_port,
|
||||
availability_zone_id: self.availability_zone_id.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Insertable, AsChangeset)]
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::service;
|
||||
use pageserver_api::controller_api::PlacementPolicy;
|
||||
use crate::{compute_hook, service};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_client::mgmt_api;
|
||||
use reqwest::StatusCode;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -45,6 +46,7 @@ pub(super) struct Reconciler {
|
||||
pub(crate) reconciler_config: ReconcilerConfig,
|
||||
|
||||
pub(crate) config: TenantConfig,
|
||||
pub(crate) preferred_az: Option<AvailabilityZone>,
|
||||
|
||||
/// Observed state from the point of view of the reconciler.
|
||||
/// This gets updated as the reconciliation makes progress.
|
||||
@@ -834,9 +836,12 @@ impl Reconciler {
|
||||
let result = self
|
||||
.compute_hook
|
||||
.notify(
|
||||
self.tenant_shard_id,
|
||||
node.get_id(),
|
||||
self.shard.stripe_size,
|
||||
compute_hook::ShardUpdate {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
node_id: node.get_id(),
|
||||
stripe_size: self.shard.stripe_size,
|
||||
preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
|
||||
},
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::{
|
||||
background_node_operations::{
|
||||
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
|
||||
},
|
||||
compute_hook::NotifyError,
|
||||
compute_hook::{self, NotifyError},
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
leadership::Leadership,
|
||||
@@ -46,10 +46,11 @@ use pageserver_api::{
|
||||
controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
|
||||
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
|
||||
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
|
||||
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
|
||||
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
|
||||
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
|
||||
ShardsPreferredAzsResponse, TenantCreateRequest, TenantCreateResponse,
|
||||
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
|
||||
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TenantShardMigrateResponse,
|
||||
},
|
||||
models::{
|
||||
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
@@ -656,11 +657,14 @@ impl Service {
|
||||
// emit a compute notification for this. In the case where our observed state does not
|
||||
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
|
||||
if let Some(attached_at) = tenant_shard.stably_attached() {
|
||||
compute_notifications.push((
|
||||
*tenant_shard_id,
|
||||
attached_at,
|
||||
tenant_shard.shard.stripe_size,
|
||||
));
|
||||
compute_notifications.push(compute_hook::ShardUpdate {
|
||||
tenant_shard_id: *tenant_shard_id,
|
||||
node_id: attached_at,
|
||||
stripe_size: tenant_shard.shard.stripe_size,
|
||||
preferred_az: tenant_shard
|
||||
.preferred_az()
|
||||
.map(|az| Cow::Owned(az.clone())),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3568,6 +3572,11 @@ impl Service {
|
||||
.iter()
|
||||
.any(|i| i.generation.is_none() || i.generation_pageserver.is_none())
|
||||
{
|
||||
let shard_generations = generations
|
||||
.into_iter()
|
||||
.map(|i| (i.tenant_shard_id, (i.generation, i.generation_pageserver)))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
// One or more shards has not been attached to a pageserver. Check if this is because it's configured
|
||||
// to be detached (409: caller should give up), or because it's meant to be attached but isn't yet (503: caller should retry)
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -3578,6 +3587,28 @@ impl Service {
|
||||
PlacementPolicy::Attached(_) => {
|
||||
// This shard is meant to be attached: the caller is not wrong to try and
|
||||
// use this function, but we can't service the request right now.
|
||||
let Some(generation) = shard_generations.get(shard_id) else {
|
||||
// This can only happen if there is a split brain controller modifying the database. This should
|
||||
// never happen when testing, and if it happens in production we can only log the issue.
|
||||
debug_assert!(false);
|
||||
tracing::error!("Shard {shard_id} not found in generation state! Is another rogue controller running?");
|
||||
continue;
|
||||
};
|
||||
let (generation, generation_pageserver) = generation;
|
||||
if let Some(generation) = generation {
|
||||
if generation_pageserver.is_none() {
|
||||
// This is legitimate only in a very narrow window where the shard was only just configured into
|
||||
// Attached mode after being created in Secondary or Detached mode, and it has had its generation
|
||||
// set but not yet had a Reconciler run (reconciler is the only thing that sets generation_pageserver).
|
||||
tracing::warn!("Shard {shard_id} generation is set ({generation:?}) but generation_pageserver is None, reconciler not run yet?");
|
||||
}
|
||||
} else {
|
||||
// This should never happen: a shard with no generation is only permitted when it was created in some state
|
||||
// other than PlacementPolicy::Attached (and generation is always written to DB before setting Attached in memory)
|
||||
debug_assert!(false);
|
||||
tracing::error!("Shard {shard_id} generation is None, but it is in PlacementPolicy::Attached mode!");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
PlacementPolicy::Secondary | PlacementPolicy::Detached => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
@@ -4786,7 +4817,15 @@ impl Service {
|
||||
for (child_id, child_ps, stripe_size) in child_locations {
|
||||
if let Err(e) = self
|
||||
.compute_hook
|
||||
.notify(child_id, child_ps, stripe_size, &self.cancel)
|
||||
.notify(
|
||||
compute_hook::ShardUpdate {
|
||||
tenant_shard_id: child_id,
|
||||
node_id: child_ps,
|
||||
stripe_size,
|
||||
preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed),
|
||||
},
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
|
||||
@@ -7158,15 +7197,24 @@ impl Service {
|
||||
|
||||
pub(crate) async fn safekeepers_list(
|
||||
&self,
|
||||
) -> Result<Vec<crate::persistence::SafekeeperPersistence>, DatabaseError> {
|
||||
self.persistence.list_safekeepers().await
|
||||
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
|
||||
Ok(self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|v| v.as_describe_response())
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_safekeeper(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<crate::persistence::SafekeeperPersistence, DatabaseError> {
|
||||
self.persistence.safekeeper_get(id).await
|
||||
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
self.persistence
|
||||
.safekeeper_get(id)
|
||||
.await
|
||||
.map(|v| v.as_describe_response())
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_safekeeper(
|
||||
|
||||
@@ -1198,6 +1198,7 @@ impl TenantShard {
|
||||
detach,
|
||||
reconciler_config,
|
||||
config: self.config.clone(),
|
||||
preferred_az: self.preferred_az_id.clone(),
|
||||
observed: self.observed.clone(),
|
||||
original_observed: self.observed.clone(),
|
||||
compute_hook: compute_hook.clone(),
|
||||
|
||||
@@ -310,7 +310,7 @@ pub(crate) enum BlobDataParseResult {
|
||||
index_part_generation: Generation,
|
||||
s3_layers: HashSet<(LayerName, Generation)>,
|
||||
},
|
||||
/// The remains of a deleted Timeline (i.e. an initdb archive only)
|
||||
/// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
|
||||
Relic,
|
||||
Incorrect {
|
||||
errors: Vec<String>,
|
||||
@@ -346,7 +346,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
match res {
|
||||
ListTimelineBlobsResult::Ready(data) => Ok(data),
|
||||
ListTimelineBlobsResult::MissingIndexPart(_) => {
|
||||
// Retry if index is missing.
|
||||
// Retry if listing raced with removal of an index
|
||||
let data = list_timeline_blobs_impl(remote_client, id, root_target)
|
||||
.await?
|
||||
.into_data();
|
||||
@@ -358,7 +358,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
enum ListTimelineBlobsResult {
|
||||
/// Blob data is ready to be intepreted.
|
||||
Ready(RemoteTimelineBlobData),
|
||||
/// List timeline blobs has layer files but is missing [`IndexPart`].
|
||||
/// The listing contained an index but when we tried to fetch it, we couldn't
|
||||
MissingIndexPart(RemoteTimelineBlobData),
|
||||
}
|
||||
|
||||
@@ -467,19 +467,19 @@ async fn list_timeline_blobs_impl(
|
||||
match index_part_object.as_ref() {
|
||||
Some(selected) => index_part_keys.retain(|k| k != selected),
|
||||
None => {
|
||||
// It is possible that the branch gets deleted after we got some layer files listed
|
||||
// and we no longer have the index file in the listing.
|
||||
errors.push(
|
||||
// This case does not indicate corruption, but it should be very unusual. It can
|
||||
// happen if:
|
||||
// - timeline creation is in progress (first layer is written before index is written)
|
||||
// - timeline deletion happened while a stale pageserver was still attached, it might upload
|
||||
// a layer after the deletion is done.
|
||||
tracing::info!(
|
||||
"S3 list response got no index_part.json file but still has layer files"
|
||||
.to_string(),
|
||||
);
|
||||
return Ok(ListTimelineBlobsResult::MissingIndexPart(
|
||||
RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
},
|
||||
));
|
||||
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Relic,
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ pytest_plugins = (
|
||||
"fixtures.compute_reconfigure",
|
||||
"fixtures.storage_controller_proxy",
|
||||
"fixtures.paths",
|
||||
"fixtures.compute_migrations",
|
||||
"fixtures.neon_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.pg_stats",
|
||||
|
||||
34
test_runner/fixtures/compute_migrations.py
Normal file
34
test_runner/fixtures/compute_migrations.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.paths import BASE_DIR
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
|
||||
COMPUTE_MIGRATIONS_DIR = BASE_DIR / "compute_tools" / "src" / "migrations"
|
||||
COMPUTE_MIGRATIONS_TEST_DIR = COMPUTE_MIGRATIONS_DIR / "tests"
|
||||
|
||||
COMPUTE_MIGRATIONS = sorted(next(os.walk(COMPUTE_MIGRATIONS_DIR))[2])
|
||||
NUM_COMPUTE_MIGRATIONS = len(COMPUTE_MIGRATIONS)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compute_migrations_dir() -> Iterator[Path]:
|
||||
"""
|
||||
Retrieve the path to the compute migrations directory.
|
||||
"""
|
||||
yield COMPUTE_MIGRATIONS_DIR
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compute_migrations_test_dir() -> Iterator[Path]:
|
||||
"""
|
||||
Retrieve the path to the compute migrations test directory.
|
||||
"""
|
||||
yield COMPUTE_MIGRATIONS_TEST_DIR
|
||||
@@ -55,3 +55,17 @@ class EndpointHttpClient(requests.Session):
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
res.raise_for_status()
|
||||
return res.text
|
||||
|
||||
def configure_failpoints(self, *args: tuple[str, str]) -> None:
|
||||
body: list[dict[str, str]] = []
|
||||
|
||||
for fp in args:
|
||||
body.append(
|
||||
{
|
||||
"name": fp[0],
|
||||
"action": fp[1],
|
||||
}
|
||||
)
|
||||
|
||||
res = self.post(f"http://localhost:{self.port}/failpoints", json=body)
|
||||
res.raise_for_status()
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user