Compare commits

...

13 Commits

Author SHA1 Message Date
Christian Schwarz
03bbce36e8 make tests fail due to allowed_errors on immediate gc before lease deadline 2025-07-01 11:54:43 +02:00
Erik Grinaker
e50b914a8e compute_tools: support gRPC base backups in compute_ctl (#12244)
## Problem

`compute_ctl` should support gRPC base backups.

Requires #12111.
Requires #12243.
Touches #11926.

## Summary of changes

Support `grpc://` connstrings for `compute_ctl` base backups.
2025-06-27 16:39:00 +00:00
Christian Schwarz
e33e109403 fix(pageserver): buffered writer cancellation error handling (#12376)
## Problem

The problem has been well described in already-commited PR #11853.
tl;dr: BufferedWriter is sensitive to cancellation, which the previous
approach was not.

The write path was most affected (ingest & compaction), which was mostly
fixed in #11853:
it introduced `PutError` and mapped instances of `PutError` that were
due to cancellation of underlying buffered writer into
`CreateImageLayersError::Cancelled`.

However, there is a long tail of remaining errors that weren't caught by
#11853 that result in `CompactionError::Other`s, which we log with great
noise.

## Solution

The stack trace logging for CompactionError::Other added in #11853
allows us to chop away at that long tail using the following pattern:
- look at the stack trace
- from leaf up, identify the place where we incorrectly map from the
distinguished variant X indicating cancellation to an `anyhow::Error`
- follow that anyhow further up, ensuring it stays the same anyhow all
the way up in the `CompactionError::Other`
- since it stayed one anyhow chain all the way up, root_cause() will
yield us X
- so, in `log_compaction_error`, add an additional `downcast_ref` check
for X

This PR specifically adds checks for
- the flush task cancelling (FlushTaskError, BlobWriterError)
- opening of the layer writer (GateError)

That should cover all the reports in issues 
- https://github.com/neondatabase/cloud/issues/29434
- https://github.com/neondatabase/neon/issues/12162

## Refs
- follow-up to #11853
- fixup of / fixes https://github.com/neondatabase/neon/issues/11762
- fixes https://github.com/neondatabase/neon/issues/12162
- refs https://github.com/neondatabase/cloud/issues/29434
2025-06-27 15:26:00 +00:00
Folke Behrens
0ee15002fc proxy: Move client connection accept and handshake to pglb (#12380)
* This must be a no-op.
* Move proxy::task_main to pglb::task_main.
* Move client accept, TLS and handshake to pglb.
* Keep auth and wake in proxy.
2025-06-27 15:20:23 +00:00
Arpad Müller
4c7956fa56 Fix hang deleting offloaded timelines (#12366)
We don't have cancellation support for timeline deletions. In other
words, timeline deletion might still go on in an older generation while
we are attaching it in a newer generation already, because the
cancellation simply hasn't reached the deletion code.

This has caused us to hit a situation with offloaded timelines in which
the timeline was in an unrecoverable state: always returning an accepted
response, but never a 404 like it should be.

The detailed description can be found in
[here](https://github.com/neondatabase/cloud/issues/30406#issuecomment-3008667859)
(private repo link).

TLDR:

1. we ask to delete timeline on old pageserver/generation, starts
process in background
2. the storcon migrates the tenant to a different pageserver.
- during attach, the pageserver still finds an index part, so it adds it
to `offloaded_timelines`
4. the timeline deletion finishes, removing the index part in S3
5. there is a retry of the timeline deletion endpoint, sent to the new
pageserver location. it is bound to fail however:
- as the index part is gone, we print `Timeline already deleted in
remote storage`.
- the problem is that we then return an accepted response code, and not
a 404.
- this confuses the code calling us. it thinks the timeline is not
deleted, so keeps retrying.
- this state never gets recovered from until a reset/detach, because of
the `offloaded_timelines` entry staying there.

This is where this PR fixes things: if no index part can be found, we
can safely assume that the timeline is gone in S3 (it's the last thing
to be deleted), so we can remove it from `offloaded_timelines` and
trigger a reupload of the manifest. Subsequent retries will pick that
up.

Why not improve the cancellation support? It is a more disruptive code
change, that might have its own risks. So we don't do it for now.

Fixes https://github.com/neondatabase/cloud/issues/30406
2025-06-27 15:14:55 +00:00
Heikki Linnakangas
5a82182c48 impr(ci): Refactor postgres Makefile targets to a separate makefile (#12363)
Mainly for general readability. Some notable changes:

- Postgres can be built without the rest of the repository, and in
particular without any of the Rust bits. Some CI scripts took advantage
of that, so let's make that more explicit by separating those parts.
Also add an explicit comment about that in the new postgres.mk file.

- Add a new PG_INSTALL_CACHED variable. If it's set, `make all` and
other top-Makefile targets skip checking if Postgres is up-to-date. This
is also to be used in CI scripts that build and cache Postgres as
separate steps. (It is currently only used in the macos walproposer-lib
rule, but stay tuned for more.)

- Introduce a POSTGRES_VERSIONS variable that lists all supported
PostgreSQL versions. Refactor a few Makefile rules to use that.
2025-06-27 14:49:52 +00:00
Arpad Müller
37e181af8a Update rust to 1.88.0 (#12364)
We keep the practice of keeping the compiler up to date, pointing to the
latest release. This is done by many other projects in the Rust
ecosystem as well.

[Announcement blog
post](https://blog.rust-lang.org/2025/06/26/Rust-1.88.0/)

Prior update was in https://github.com/neondatabase/neon/pull/11938
2025-06-27 13:51:59 +00:00
Peter Bendel
6f4198c78a treat strategy flag test_maintenance as boolean data type (#12373)
## Problem

In large oltp test run
https://github.com/neondatabase/neon/actions/runs/15905488707/job/44859116742
we see that the `Benchmark database maintenance` step is skipped in all
3 strategy variants, however it should be executed in two.

This is due to treating the `test_maintenance` boolean type in the
strategy in the condition of the `Benchmark database maintenance` step

## Summary of changes
Use a boolean condition instead of a string comparison

## Test run from this pull request branch

https://github.com/neondatabase/neon/actions/runs/15923605412
2025-06-27 13:49:26 +00:00
Vlad Lazar
cc1664ef93 pageserver: allow flush task cancelled error in sharding autosplit test (#12374)
## Problem

Test is failing due to compaction shutdown noise (see
https://github.com/neondatabase/neon/issues/12162).

## Summary of changes

Allow list the noise.
2025-06-27 13:13:11 +00:00
Vlad Lazar
ebb6e26a64 pageserver: handle multiple attached children in shard resolution (#12336)
## Problem

When resolving a shard during a split we might have multiple attached
shards with the old shard count (i.e. not all of them are marked in
progress and ignored). Hence, we can compute the desired shard number
based on the old shard count and misroute the request.

## Summary of Changes

Recompute the desired shard every time the shard count changes during
the iteration
2025-06-27 12:46:18 +00:00
Mikhail
ebc12a388c fix: endpoint_storage_addr as String (#12359)
It's not a SocketAddr as we use k8s DNS
https://github.com/neondatabase/cloud/issues/19011
2025-06-27 11:06:27 +00:00
Conrad Ludgate
abc1efd5a6 [proxy] fix connect_to_compute retry handling (#12351)
# Problem

In #12335 I moved the `authenticate` method outside of the
`connect_to_compute` loop. This triggered [e2e tests to become
flaky](https://github.com/neondatabase/cloud/pull/30533). This
highlighted an edge case we forgot to consider with that change.

When we connect to compute, the compute IP might be cached. This cache
hit might however be stale. Because we can't validate the IP is
associated with a specific compute-id☨, we will succeed the
connect_to_compute operation and fail when it comes to password
authentication☨☨. Before the change, we were invalidating the cache and
triggering wake_compute if the authentication failed.

Additionally, I noticed some faulty logic I introduced 1 year ago
https://github.com/neondatabase/neon/pull/8141/files#diff-5491e3afe62d8c5c77178149c665603b29d88d3ec2e47fc1b3bb119a0a970afaL145-R147

☨ We can when we roll out TLS, as the certificate common name includes
the compute-id.

☨☨ Technically password authentication could pass for the wrong compute,
but I think this would only happen in the very very rare event that the
IP got reused **and** the compute's endpoint happened to be a
branch/replica.

# Solution

1. Fix the broken logic
2. Simplify cache invalidation (I don't know why it was so convoluted)
3. Add a loop around connect_to_compute + authenticate to re-introduce
the wake_compute invalidation we accidentally removed.

I went with this approach to try and avoid interfering with
https://github.com/neondatabase/neon/compare/main...cloneable/proxy-pglb-connect-compute-split.
The changes made in commit 3 will move into `handle_client_request` I
suspect,
2025-06-27 10:36:27 +00:00
Dmitrii Kovalkov
6fa1562b57 pageserver: increase default max_size_entries limit for basebackup cache (#12343)
## Problem
Some pageservers hit `max_size_entries` limit in staging with only ~25
MiB storage used by basebackup cache. The limit is too strict. It should
be safe to relax it.

- Part of https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Increase the default `max_size_entries` from 1000 to 10000
2025-06-27 09:18:18 +00:00
40 changed files with 1013 additions and 584 deletions

View File

@@ -4,6 +4,7 @@
!Cargo.lock
!Cargo.toml
!Makefile
!postgres.mk
!rust-toolchain.toml
!scripts/ninstall.sh
!docker-compose/run-tests.sh

View File

@@ -135,6 +135,12 @@ jobs:
name: pg_install--v17
path: pg_install/v17
# `actions/download-artifact` doesn't preserve permissions:
# https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss
- name: Make pg_install/v*/bin/* executable
run: |
chmod +x pg_install/v*/bin/*
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
@@ -162,7 +168,7 @@ jobs:
- name: Build walproposer-lib (only for v17)
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run:
make walproposer-lib -j$(sysctl -n hw.ncpu)
make walproposer-lib -j$(sysctl -n hw.ncpu) PG_INSTALL_CACHED=1
- name: Upload "build/walproposer-lib" artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2

View File

@@ -153,7 +153,7 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark database maintenance
if: ${{ matrix.test_maintenance == 'true' }}
if: ${{ matrix.test_maintenance }}
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}

6
Cargo.lock generated
View File

@@ -1316,6 +1316,7 @@ dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"p256 0.13.2",
"pageserver_page_api",
"postgres",
"postgres_initdb",
"postgres_versioninfo",
@@ -1335,6 +1336,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tonic 0.13.1",
"tower 0.5.2",
"tower-http",
"tower-otel",
@@ -4475,12 +4477,13 @@ dependencies = [
"bytes",
"futures",
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"prost 0.13.5",
"strum",
"strum_macros",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tonic-build",
"utils",
@@ -8679,7 +8682,6 @@ dependencies = [
"num-iter",
"num-rational",
"num-traits",
"once_cell",
"p256 0.13.2",
"parquet",
"prettyplease",

View File

@@ -199,7 +199,7 @@ tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] }
toml = "0.8"
toml_edit = "0.22"
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] }

View File

@@ -40,6 +40,7 @@ COPY --chown=nonroot vendor/postgres-v16 vendor/postgres-v16
COPY --chown=nonroot vendor/postgres-v17 vendor/postgres-v17
COPY --chown=nonroot pgxn pgxn
COPY --chown=nonroot Makefile Makefile
COPY --chown=nonroot postgres.mk postgres.mk
COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
ENV BUILD_TYPE=release

133
Makefile
View File

@@ -4,11 +4,14 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# managers.
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
# Supported PostgreSQL versions
POSTGRES_VERSIONS = v17 v16 v15 v14
# CARGO_BUILD_FLAGS: Extra flags to pass to `cargo build`. `--locked`
# and `--features testing` are popular examples.
#
# CARGO_PROFILE: You can also set to override the cargo profile to
# use. By default, it is derived from BUILD_TYPE.
# CARGO_PROFILE: Set to override the cargo profile to use. By default,
# it is derived from BUILD_TYPE.
# All intermediate build artifacts are stored here.
BUILD_DIR := build
@@ -95,95 +98,24 @@ CACHEDIR_TAG_CONTENTS := "Signature: 8a477f597d28d172789f06886806bc55"
# Top level Makefile to build Neon and PostgreSQL
#
.PHONY: all
all: neon postgres neon-pg-ext
all: neon postgres-install neon-pg-ext
### Neon Rust bits
#
# The 'postgres_ffi' depends on the Postgres headers.
.PHONY: neon
neon: postgres-headers walproposer-lib cargo-target-dir
neon: postgres-headers-install walproposer-lib cargo-target-dir
+@echo "Compiling Neon"
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE)
.PHONY: cargo-target-dir
cargo-target-dir:
# https://github.com/rust-lang/cargo/issues/14281
mkdir -p target
test -e target/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > target/CACHEDIR.TAG
### PostgreSQL parts
# Some rules are duplicated for Postgres v14 and 15. We may want to refactor
# to avoid the duplication in the future, but it's tolerable for now.
#
$(BUILD_DIR)/%/config.status:
mkdir -p $(BUILD_DIR)
test -e $(BUILD_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(BUILD_DIR)/CACHEDIR.TAG
+@echo "Configuring Postgres $* build"
@test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \
echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
exit 1; }
mkdir -p $(BUILD_DIR)/$*
VERSION=$*; \
EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \
(cd $(BUILD_DIR)/$$VERSION && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \
CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \
$(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$$VERSION > configure.log)
# nicer alias to run 'configure'
# Note: I've been unable to use templates for this part of our configuration.
# I'm not sure why it wouldn't work, but this is the only place (apart from
# the "build-all-versions" entry points) where direct mention of PostgreSQL
# versions is used.
.PHONY: postgres-configure-v17
postgres-configure-v17: $(BUILD_DIR)/v17/config.status
.PHONY: postgres-configure-v16
postgres-configure-v16: $(BUILD_DIR)/v16/config.status
.PHONY: postgres-configure-v15
postgres-configure-v15: $(BUILD_DIR)/v15/config.status
.PHONY: postgres-configure-v14
postgres-configure-v14: $(BUILD_DIR)/v14/config.status
# Install just the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/<version>/include
#
# This is implicitly included in the 'postgres-%' rule, but this can be handy if you
# want to just install the headers without building PostgreSQL, e.g. for building
# extensions.
.PHONY: postgres-headers-%
postgres-headers-%: postgres-configure-%
+@echo "Installing PostgreSQL $* headers"
$(MAKE) -C $(BUILD_DIR)/$*/src/include MAKELEVEL=0 install
# Compile and install PostgreSQL
.PHONY: postgres-%
postgres-%: postgres-configure-% \
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL $*"
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install
+@echo "Compiling pg_prewarm $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install
+@echo "Compiling pg_buffercache $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_buffercache install
+@echo "Compiling pg_visibility $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_visibility install
+@echo "Compiling pageinspect $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pageinspect install
+@echo "Compiling pg_trgm $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_trgm install
+@echo "Compiling amcheck $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/amcheck install
+@echo "Compiling test_decoding $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/test_decoding install
.PHONY: postgres-check-%
postgres-check-%: postgres-%
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 check
.PHONY: neon-pg-ext-%
neon-pg-ext-%: postgres-%
neon-pg-ext-%: postgres-install-%
+@echo "Compiling neon-specific Postgres extensions for $*"
mkdir -p $(BUILD_DIR)/pgxn-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
@@ -222,39 +154,14 @@ ifeq ($(UNAME_S),Linux)
pg_crc32c.o
endif
# Shorthand to call neon-pg-ext-% target for all Postgres versions
.PHONY: neon-pg-ext
neon-pg-ext: \
neon-pg-ext-v14 \
neon-pg-ext-v15 \
neon-pg-ext-v16 \
neon-pg-ext-v17
# shorthand to build all Postgres versions
.PHONY: postgres
postgres: \
postgres-v14 \
postgres-v15 \
postgres-v16 \
postgres-v17
.PHONY: postgres-headers
postgres-headers: \
postgres-headers-v14 \
postgres-headers-v15 \
postgres-headers-v16 \
postgres-headers-v17
.PHONY: postgres-check
postgres-check: \
postgres-check-v14 \
postgres-check-v15 \
postgres-check-v16 \
postgres-check-v17
neon-pg-ext: $(foreach pg_version,$(POSTGRES_VERSIONS),neon-pg-ext-$(pg_version))
# This removes everything
.PHONY: distclean
distclean:
$(RM) -r $(POSTGRES_INSTALL_DIR)
$(RM) -r $(POSTGRES_INSTALL_DIR) $(BUILD_DIR)
$(CARGO_CMD_PREFIX) cargo clean
.PHONY: fmt
@@ -302,3 +209,19 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
.PHONY: setup-pre-commit-hook
setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
# Targets for building PostgreSQL are defined in postgres.mk.
#
# But if the caller has indicated that PostgreSQL is already
# installed, by setting the PG_INSTALL_CACHED variable, skip it.
ifdef PG_INSTALL_CACHED
postgres-install: skip-install
$(foreach pg_version,$(POSTGRES_VERSIONS),postgres-install-$(pg_version)): skip-install
postgres-headers-install:
+@echo "Skipping installation of PostgreSQL headers because PG_INSTALL_CACHED is set"
skip-install:
+@echo "Skipping PostgreSQL installation because PG_INSTALL_CACHED is set"
else
include postgres.mk
endif

View File

@@ -165,6 +165,7 @@ RUN curl -fsSL \
&& rm sql_exporter.tar.gz
# protobuf-compiler (protoc)
# Keep the version the same as in compute/compute-node.Dockerfile
ENV PROTOC_VERSION=25.1
RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-$(uname -m | sed 's/aarch64/aarch_64/g').zip" -o "protoc.zip" \
&& unzip -q protoc.zip -d protoc \
@@ -179,7 +180,7 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
&& mv s5cmd /usr/local/bin/s5cmd
# LLVM
ENV LLVM_VERSION=19
ENV LLVM_VERSION=20
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION}/ llvm-toolchain-${DEBIAN_VERSION}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt update \
@@ -292,7 +293,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.87.0
ENV RUSTC_VERSION=1.88.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1

View File

@@ -115,6 +115,9 @@ ARG EXTENSIONS=all
FROM $BASE_IMAGE_SHA AS build-deps
ARG DEBIAN_VERSION
# Keep in sync with build-tools.Dockerfile
ENV PROTOC_VERSION=25.1
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
@@ -149,8 +152,14 @@ RUN case $DEBIAN_VERSION in \
libclang-dev \
jsonnet \
$VERSION_INSTALLS \
&& apt clean && rm -rf /var/lib/apt/lists/* && \
useradd -ms /bin/bash nonroot -b /home
&& apt clean && rm -rf /var/lib/apt/lists/* \
&& useradd -ms /bin/bash nonroot -b /home \
# Install protoc from binary release, since Debian's versions are too old.
&& curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-$(uname -m | sed 's/aarch64/aarch_64/g').zip" -o "protoc.zip" \
&& unzip -q protoc.zip -d protoc \
&& mv protoc/bin/protoc /usr/local/bin/protoc \
&& mv protoc/include/google /usr/local/include/google \
&& rm -rf protoc.zip protoc
#########################################################################################
#
@@ -1170,7 +1179,7 @@ COPY --from=pgrag-src /ext-src/ /ext-src/
# Install it using virtual environment, because Python 3.11 (the default version on Debian 12 (Bookworm)) complains otherwise
WORKDIR /ext-src/onnxruntime-src
RUN apt update && apt install --no-install-recommends --no-install-suggests -y \
python3 python3-pip python3-venv protobuf-compiler && \
python3 python3-pip python3-venv && \
apt clean && rm -rf /var/lib/apt/lists/* && \
python3 -m venv venv && \
. venv/bin/activate && \

View File

@@ -38,6 +38,7 @@ once_cell.workspace = true
opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
p256 = { version = "0.13", features = ["pem"] }
pageserver_page_api.workspace = true
postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
@@ -53,6 +54,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
tonic.workspace = true
tower-otel.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true

View File

@@ -1,4 +1,4 @@
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
@@ -15,12 +15,12 @@ use itertools::Itertools;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use pageserver_page_api::{self as page_api, BaseBackupCompression};
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
@@ -36,6 +36,7 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
use utils::pid_file;
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use crate::configurator::launch_configurator;
use crate::disk_quota::set_disk_quota;
@@ -218,7 +219,8 @@ pub struct ParsedSpec {
pub pageserver_connstr: String,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
pub endpoint_storage_addr: Option<SocketAddr>,
/// k8s dns name and port
pub endpoint_storage_addr: Option<String>,
pub endpoint_storage_token: Option<String>,
}
@@ -313,13 +315,10 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.or(Err("invalid timeline id"))?
};
let endpoint_storage_addr: Option<SocketAddr> = spec
let endpoint_storage_addr: Option<String> = spec
.endpoint_storage_addr
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
.unwrap_or_default()
.parse()
.ok();
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"));
let endpoint_storage_token = spec
.endpoint_storage_token
.clone()
@@ -998,13 +997,87 @@ impl ComputeNode {
Ok(())
}
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
/// Fetches a basebackup from the Pageserver using the compute state's Pageserver connstring and
/// unarchives it to `pgdata` directory, replacing any existing contents.
#[instrument(skip_all, fields(%lsn))]
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Instant::now();
// Detect the protocol scheme. If the URL doesn't have a scheme, assume libpq.
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let scheme = match Url::parse(shard0_connstr) {
Ok(url) => url.scheme().to_lowercase().to_string(),
Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
};
let started = Instant::now();
let (connected, size) = match scheme.as_str() {
"postgresql" | "postgres" => self.try_get_basebackup_libpq(spec, lsn)?,
"grpc" => self.try_get_basebackup_grpc(spec, lsn)?,
scheme => return Err(anyhow!("unknown URL scheme {scheme}")),
};
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros =
connected.duration_since(started).as_micros() as u64;
state.metrics.basebackup_bytes = size as u64;
state.metrics.basebackup_ms = started.elapsed().as_millis() as u64;
Ok(())
}
/// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when
/// the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_connstr = spec
.pageserver_connstr
.split(',')
.next()
.unwrap()
.to_string();
let shard_index = match spec.pageserver_connstr.split(',').count() as u8 {
0 | 1 => ShardIndex::unsharded(),
count => ShardIndex::new(ShardNumber(0), ShardCount(count)),
};
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::new(
shard0_connstr,
spec.tenant_id,
spec.timeline_id,
shard_index,
spec.storage_auth_token.clone(),
None, // NB: base backups use payload compression
)
.await?;
let connected = Instant::now();
let reader = client
.get_base_backup(page_api::GetBaseBackupRequest {
lsn: (lsn != Lsn(0)).then_some(lsn),
compression: BaseBackupCompression::Gzip,
replica: spec.spec.mode != ComputeMode::Primary,
full: false,
})
.await?;
anyhow::Ok((reader, connected))
})?;
let mut reader = MeasuredReader::new(tokio_util::io::SyncIoBridge::new(reader));
// Set `ignore_zeros` so that unpack() reads the entire stream and doesn't just stop at the
// end-of-archive marker. If the server errors, the tar::Builder drop handler will write an
// end-of-archive marker before the error is emitted, and we would not see the error.
let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut reader));
ar.set_ignore_zeros(true);
ar.unpack(&self.params.pgdata)?;
Ok((connected, reader.get_byte_count()))
}
/// Fetches a basebackup via libpq. The connstring must use postgresql://. Returns the timestamp
/// when the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
@@ -1018,16 +1091,14 @@ impl ComputeNode {
}
config.application_name("compute_ctl");
if let Some(spec) = &compute_state.pspec {
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
}
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let connected = Instant::now();
let basebackup_cmd = match lsn {
Lsn(0) => {
@@ -1064,16 +1135,13 @@ impl ComputeNode {
// Set `ignore_zeros` so that unpack() reads all the Copy data and
// doesn't stop at the end-of-archive marker. Otherwise, if the server
// sends an Error after finishing the tarball, we will not notice it.
// The tar::Builder drop handler will write an end-of-archive marker
// before emitting the error, and we would not see it otherwise.
let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
ar.set_ignore_zeros(true);
ar.unpack(&self.params.pgdata)?;
// Report metrics
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
Ok(())
Ok((connected, measured_reader.get_byte_count()))
}
// Gets the basebackup in a retry loop

View File

@@ -420,7 +420,7 @@ impl Default for BasebackupCacheConfig {
cleanup_period: Duration::from_secs(60),
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
max_size_entries: 1000,
max_size_entries: 10000,
prepare_channel_size: 100,
}
}

View File

@@ -86,6 +86,14 @@ pub enum GateError {
GateClosed,
}
impl GateError {
pub fn is_cancel(&self) -> bool {
match self {
GateError::GateClosed => true,
}
}
}
impl Default for Gate {
fn default() -> Self {
Self {

View File

@@ -9,12 +9,13 @@ anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
pageserver_api.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
prost.workspace = true
strum.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tonic.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -1,8 +1,7 @@
use std::convert::TryInto;
use bytes::Bytes;
use futures::TryStreamExt;
use futures::{Stream, StreamExt};
use anyhow::Result;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use tokio::io::AsyncRead;
use tokio_util::io::StreamReader;
use tonic::metadata::AsciiMetadataValue;
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::transport::Channel;
@@ -12,8 +11,6 @@ use utils::id::TenantId;
use utils::id::TimelineId;
use utils::shard::ShardIndex;
use anyhow::Result;
use crate::model;
use crate::proto;
@@ -69,6 +66,7 @@ impl tonic::service::Interceptor for AuthInterceptor {
Ok(req)
}
}
#[derive(Clone)]
pub struct Client {
client: proto::PageServiceClient<
@@ -120,22 +118,15 @@ impl Client {
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>> + 'static, tonic::Status> {
let proto_req = proto::GetBaseBackupRequest::from(req);
let response_stream: Streaming<proto::GetBaseBackupResponseChunk> =
self.client.get_base_backup(proto_req).await?.into_inner();
// TODO: Consider dechunking internally
let domain_stream = response_stream.map(|chunk_res| {
chunk_res.and_then(|proto_chunk| {
proto_chunk.try_into().map_err(|e| {
tonic::Status::internal(format!("Failed to convert response chunk: {e}"))
})
})
});
Ok(domain_stream)
) -> Result<impl AsyncRead + use<>, tonic::Status> {
let req = proto::GetBaseBackupRequest::from(req);
let chunks = self.client.get_base_backup(req).await?.into_inner();
let reader = StreamReader::new(
chunks
.map_ok(|resp| resp.chunk)
.map_err(std::io::Error::other),
);
Ok(reader)
}
/// Returns the total size of a database, as # of bytes.

View File

@@ -18,8 +18,8 @@
use std::fmt::Display;
use bytes::Bytes;
use postgres_ffi::Oid;
// TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid
use postgres_ffi_types::Oid;
// TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid
// pulling in all of their other crate dependencies when building the client.
use utils::lsn::Lsn;

View File

@@ -355,9 +355,6 @@ impl Client for GrpcClient {
full: false,
compression: self.compression,
};
let stream = self.inner.get_base_backup(req).await?;
Ok(Box::pin(StreamReader::new(
stream.map_err(std::io::Error::other),
)))
Ok(Box::pin(self.inner.get_base_backup(req).await?))
}
}

View File

@@ -37,7 +37,7 @@ async fn main() -> anyhow::Result<()> {
not_modified_since: Lsn(23),
},
batch_key: 42,
message: format!("message {}", msg),
message: format!("message {msg}"),
}));
let Ok(res) = tokio::time::timeout(Duration::from_secs(10), fut).await else {
eprintln!("pipe seems full");

View File

@@ -11429,11 +11429,11 @@ mod tests {
if left != right {
eprintln!("---LEFT---");
for left in left.iter() {
eprintln!("{}", left);
eprintln!("{left}");
}
eprintln!("---RIGHT---");
for right in right.iter() {
eprintln!("{}", right);
eprintln!("{right}");
}
assert_eq!(left, right);
}

View File

@@ -2200,7 +2200,7 @@ impl TenantManager {
selector: ShardSelector,
) -> ShardResolveResult {
let tenants = self.tenants.read().unwrap();
let mut want_shard = None;
let mut want_shard: Option<ShardIndex> = None;
let mut any_in_progress = None;
match &*tenants {
@@ -2225,14 +2225,23 @@ impl TenantManager {
return ShardResolveResult::Found(tenant.clone());
}
ShardSelector::Page(key) => {
// First slot we see for this tenant, calculate the expected shard number
// for the key: we will use this for checking if this and subsequent
// slots contain the key, rather than recalculating the hash each time.
if want_shard.is_none() {
want_shard = Some(tenant.shard_identity.get_shard_number(&key));
// Each time we find an attached slot with a different shard count,
// recompute the expected shard number: during shard splits we might
// have multiple shards with the old shard count.
if want_shard.is_none()
|| want_shard.unwrap().shard_count != tenant.shard_identity.count
{
want_shard = Some(ShardIndex {
shard_number: tenant.shard_identity.get_shard_number(&key),
shard_count: tenant.shard_identity.count,
});
}
if Some(tenant.shard_identity.number) == want_shard {
if Some(ShardIndex {
shard_number: tenant.shard_identity.number,
shard_count: tenant.shard_identity.count,
}) == want_shard
{
return ShardResolveResult::Found(tenant.clone());
}
}
@@ -2338,6 +2347,17 @@ impl TenantManager {
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
if cfg!(feature = "testing")
&& tenant
.tenant_conf
.load()
.is_gc_blocked_by_lsn_lease_deadline()
{
warn!(
"test is requesting immediate GC but lease deadline is still on, test might be impacted"
);
}
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]

View File

@@ -17,14 +17,17 @@ use tracing::*;
use utils::backoff::exponential_backoff_duration;
use utils::completion::Barrier;
use utils::pausable_failpoint;
use utils::sync::gate::GateError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
use crate::tenant::blob_io::WriteBlobError;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::{TenantShard, TenantState};
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
/// Semaphore limiting concurrent background tasks (across all tenants).
///
@@ -313,7 +316,20 @@ pub(crate) fn log_compaction_error(
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let is_stopping = upload_queue || timeline;
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
let is_stopping = upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed;
if is_stopping {
Level::INFO

View File

@@ -241,8 +241,17 @@ impl DeleteTimelineFlow {
{
Ok(r) => r,
Err(DownloadError::NotFound) => {
// Deletion is already complete
// Deletion is already complete.
// As we came here, we will need to remove the timeline from the tenant though.
tracing::info!("Timeline already deleted in remote storage");
if let TimelineOrOffloaded::Offloaded(_) = &timeline {
// We only supoprt this for offloaded timelines, as we don't know which state non-offloaded timelines are in.
tracing::info!(
"Timeline with gone index part is offloaded timeline. Removing from tenant."
);
remove_maybe_offloaded_timeline_from_tenant(tenant, &timeline, &guard)
.await?;
}
return Ok(());
}
Err(e) => {

121
postgres.mk Normal file
View File

@@ -0,0 +1,121 @@
# Sub-makefile for compiling PostgreSQL as part of Neon. This is
# included from the main Makefile, and is not meant to be called
# directly.
#
# CI workflows and Dockerfiles can take advantage of the following
# properties for caching:
#
# - Compiling the targets in this file only builds the PostgreSQL sources
# under the vendor/ subdirectory, nothing else from the repository.
# - All outputs go to POSTGRES_INSTALL_DIR (by default 'pg_install',
# see parent Makefile)
# - intermediate build artifacts go to BUILD_DIR
#
#
# Variables passed from the parent Makefile that control what gets
# installed and where:
# - POSTGRES_VERSIONS
# - POSTGRES_INSTALL_DIR
# - BUILD_DIR
#
# Variables passed from the parent Makefile that affect the build
# process and the resulting binaries:
# - PG_CONFIGURE_OPTS
# - PG_CFLAGS
# - PG_LDFLAGS
# - EXTRA_PATH_OVERRIDES
###
### Main targets
###
### These are called from the main Makefile, and can also be called
### directly from command line
# Compile and install a specific PostgreSQL version
postgres-install-%: postgres-configure-% \
postgres-headers-install-% # to prevent `make install` conflicts with neon's `postgres-headers`
# Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/<version>/include
#
# This is implicitly part of the 'postgres-install-%' target, but this can be handy
# if you want to install just the headers without building PostgreSQL, e.g. for building
# extensions.
postgres-headers-install-%: postgres-configure-%
+@echo "Installing PostgreSQL $* headers"
$(MAKE) -C $(BUILD_DIR)/$*/src/include MAKELEVEL=0 install
# Run Postgres regression tests
postgres-check-%: postgres-install-%
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 check
###
### Shorthands for the main targets, for convenience
###
# Same as the above main targets, but for all supported PostgreSQL versions
# For example, 'make postgres-install' is equivalent to
# 'make postgres-install-v14 postgres-install-v15 postgres-install-v16 postgres-install-v17'
all_version_targets=postgres-install postgres-headers-install postgres-check
.PHONY: $(all_version_targets)
$(all_version_targets): postgres-%: $(foreach pg_version,$(POSTGRES_VERSIONS),postgres-%-$(pg_version))
.PHONY: postgres
postgres: postgres-install
.PHONY: postgres-headers
postgres-headers: postgres-headers-install
# 'postgres-v17' is an alias for 'postgres-install-v17' etc.
$(foreach pg_version,$(POSTGRES_VERSIONS),postgres-$(pg_version)): postgres-%: postgres-install-%
###
### Intermediate targets
###
### These are not intended to be called directly, but are dependencies for the
### main targets.
# Run 'configure'
$(BUILD_DIR)/%/config.status:
mkdir -p $(BUILD_DIR)
test -e $(BUILD_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(BUILD_DIR)/CACHEDIR.TAG
+@echo "Configuring Postgres $* build"
@test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \
echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
exit 1; }
mkdir -p $(BUILD_DIR)/$*
VERSION=$*; \
EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \
(cd $(BUILD_DIR)/$$VERSION && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \
CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \
$(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$$VERSION > configure.log)
# nicer alias to run 'configure'.
#
# This tries to accomplish this rule:
#
# postgres-configure-%: $(BUILD_DIR)/%/config.status
#
# XXX: I'm not sure why the above rule doesn't work directly. But this accomplishses
# the same thing
$(foreach pg_version,$(POSTGRES_VERSIONS),postgres-configure-$(pg_version)): postgres-configure-%: FORCE $(BUILD_DIR)/%/config.status
# Compile and install PostgreSQL (and a few contrib modules used in tests)
postgres-install-%: postgres-configure-% \
postgres-headers-install-% # to prevent `make install` conflicts with neon's `postgres-headers-install`
+@echo "Compiling PostgreSQL $*"
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_buffercache install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_visibility install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pageinspect install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_trgm install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/amcheck install
$(MAKE) -C $(BUILD_DIR)/$*/contrib/test_decoding install
.PHONY: FORCE
FORCE:

View File

@@ -26,9 +26,10 @@ use utils::sentry_init::init_sentry;
use crate::context::RequestContext;
use crate::metrics::{Metrics, ThreadPoolMetrics};
use crate::pglb::TlsRequired;
use crate::pqproto::FeStartupPacket;
use crate::protocol2::ConnectionInfo;
use crate::proxy::{ErrorSource, TlsRequired, copy_bidirectional_client_compute};
use crate::proxy::{ErrorSource, copy_bidirectional_client_compute};
use crate::stream::{PqStream, Stream};
use crate::util::run_until_cancelled;

View File

@@ -392,7 +392,7 @@ pub async fn run() -> anyhow::Result<()> {
match auth_backend {
Either::Left(auth_backend) => {
if let Some(proxy_listener) = proxy_listener {
client_tasks.spawn(crate::proxy::task_main(
client_tasks.spawn(crate::pglb::task_main(
config,
auth_backend,
proxy_listener,

View File

@@ -30,7 +30,7 @@ use super::{Cache, timed_lru};
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information.
/// See [`timed_lru::Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
@@ -54,7 +54,7 @@ pub(crate) struct TimedLru<K, V> {
impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
type Key = K;
type Value = V;
type LookupInfo<Key> = LookupInfo<Key>;
type LookupInfo<Key> = Key;
fn invalidate(&self, info: &Self::LookupInfo<K>) {
self.invalidate_raw(info);
@@ -87,30 +87,24 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
/// Drop an entry from the cache if it's outdated.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn invalidate_raw(&self, info: &LookupInfo<K>) {
let now = Instant::now();
fn invalidate_raw(&self, key: &K) {
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let raw_entry = match cache.raw_entry_mut().from_key(&info.key) {
let entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return,
RawEntryMut::Occupied(x) => x,
RawEntryMut::Occupied(x) => x.remove(),
};
// Remove the entry if it was created prior to lookup timestamp.
let entry = raw_entry.get();
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
let should_remove = created_at <= info.created_at || expires_at <= now;
if should_remove {
raw_entry.remove();
}
drop(cache); // drop lock before logging
let Entry {
created_at,
expires_at,
..
} = entry;
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
entry_removed = should_remove,
?created_at,
?expires_at,
"processed a cache entry invalidation event"
);
}
@@ -211,10 +205,10 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
}
pub(crate) fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
let (created_at, old) = self.insert_raw(key.clone(), value);
let (_, old) = self.insert_raw(key.clone(), value);
let cached = Cached {
token: Some((self, LookupInfo { created_at, key })),
token: Some((self, key)),
value: (),
};
@@ -229,28 +223,9 @@ impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| {
let info = LookupInfo {
created_at: entry.created_at,
key: key.clone(),
};
Cached {
token: Some((self, info)),
value: entry.value.clone(),
}
self.get_raw(key, |key, entry| Cached {
token: Some((self, key.clone())),
value: entry.value.clone(),
})
}
}
/// Lookup information for key invalidation.
pub(crate) struct LookupInfo<K> {
/// Time of creation of a cache [`Entry`].
/// We use this during invalidation lookups to prevent eviction of a newer
/// entry sharing the same key (it might've been inserted by a different
/// task after we got the entry we're trying to invalidate now).
created_at: Instant,
/// Search by this key.
key: K,
}

View File

@@ -236,7 +236,7 @@ impl AuthInfo {
&self,
ctx: &RequestContext,
compute: &mut ComputeConnection,
user_info: ComputeUserInfo,
user_info: &ComputeUserInfo,
) -> Result<PostgresSettings, PostgresError> {
// client config with stubbed connect info.
// TODO(conrad): should we rewrite this to bypass tokio-postgres2 entirely,
@@ -272,7 +272,7 @@ impl AuthInfo {
secret_key,
},
compute.hostname.to_string(),
user_info,
user_info.clone(),
);
Ok(PostgresSettings {

View File

@@ -11,11 +11,12 @@ use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::pglb::ClientRequestError;
use crate::pglb::handshake::{HandshakeData, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection};
use crate::proxy::{ErrorSource, finish_client_init};
use crate::util::run_until_cancelled;
pub async fn task_main(
@@ -226,13 +227,13 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.await?;
let pg_settings = auth_info
.authenticate(ctx, &mut node, user_info)
.authenticate(ctx, &mut node, &user_info)
.or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) })
.await?;
let session = cancellation_handler.get_key();
prepare_client_connection(&pg_settings, *session.key(), &mut stream);
finish_client_init(&pg_settings, *session.key(), &mut stream);
let stream = stream.flush_and_into_inner().await?;
let session_id = ctx.session_id();

View File

@@ -8,10 +8,10 @@ use crate::config::TlsConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::Metrics;
use crate::pglb::TlsRequired;
use crate::pqproto::{
BeMessage, CancelKeyData, FeStartupPacket, ProtocolVersion, StartupMessageParams,
};
use crate::proxy::TlsRequired;
use crate::stream::{PqStream, Stream, StreamUpgradeError};
use crate::tls::PG_ALPN_PROTOCOL;

View File

@@ -2,3 +2,332 @@ pub mod copy_bidirectional;
pub mod handshake;
pub mod inprocess;
pub mod passthrough;
use std::sync::Arc;
use futures::FutureExt;
use smol_str::ToSmolStr;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, warn};
use crate::auth;
use crate::cancellation::{self, CancellationHandler};
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::context::RequestContext;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumClientConnectionsGuard};
pub use crate::pglb::copy_bidirectional::ErrorSource;
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
use crate::proxy::handle_client;
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::Stream;
use crate::util::run_until_cancelled;
pub const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
#[derive(Error, Debug)]
#[error("{ERR_INSECURE_CONNECTION}")]
pub struct TlsRequired;
impl ReportableError for TlsRequired {
fn get_error_kind(&self) -> crate::error::ErrorKind {
crate::error::ErrorKind::User
}
}
impl UserFacingError for TlsRequired {}
pub async fn task_main(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandler>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
let conn_gauge = Metrics::get()
.proxy
.client_connections
.guard(crate::metrics::Protocol::Tcp);
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let cancellations = cancellations.clone();
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
connections.spawn(async move {
let (socket, conn_info) = match config.proxy_protocol_v2 {
ProxyProtocolV2::Required => {
match read_proxy_protocol(socket).await {
Err(e) => {
warn!("per-client task finished with an error: {e:#}");
return;
}
// our load balancers will not send any more data. let's just exit immediately
Ok((_socket, ConnectHeader::Local)) => {
debug!("healthcheck received");
return;
}
Ok((socket, ConnectHeader::Proxy(info))) => (socket, info),
}
}
// ignore the header - it cannot be confused for a postgres or http connection so will
// error later.
ProxyProtocolV2::Rejected => (
socket,
ConnectionInfo {
addr: peer_addr,
extra: None,
},
),
};
match socket.set_nodelay(true) {
Ok(()) => {}
Err(e) => {
error!(
"per-client task finished with an error: failed to set socket option: {e:#}"
);
return;
}
}
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
let res = handle_connection(
config,
auth_backend,
&ctx,
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter2,
conn_gauge,
cancellations,
)
.instrument(ctx.span())
.boxed()
.await;
match res {
Err(e) => {
ctx.set_error_kind(e.get_error_kind());
warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
}
Ok(None) => {
ctx.set_success();
}
Ok(Some(p)) => {
ctx.set_success();
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
warn!(
?session_id,
"per-client task finished with an IO error from the client: {e:#}"
);
}
Err(ErrorSource::Compute(e)) => {
error!(
?session_id,
"per-client task finished with an IO error from the compute: {e:#}"
);
}
}
}
}
});
}
connections.close();
cancellations.close();
drop(listener);
// Drain connections
connections.wait().await;
cancellations.wait().await;
Ok(())
}
pub(crate) enum ClientMode {
Tcp,
Websockets { hostname: Option<String> },
}
/// Abstracts the logic of handling TCP vs WS clients
impl ClientMode {
pub fn allow_cleartext(&self) -> bool {
match self {
ClientMode::Tcp => false,
ClientMode::Websockets { .. } => true,
}
}
pub fn hostname<'a, S>(&'a self, s: &'a Stream<S>) -> Option<&'a str> {
match self {
ClientMode::Tcp => s.sni_hostname(),
ClientMode::Websockets { hostname } => hostname.as_deref(),
}
}
pub fn handshake_tls<'a>(&self, tls: Option<&'a TlsConfig>) -> Option<&'a TlsConfig> {
match self {
ClientMode::Tcp => tls,
// TLS is None here if using websockets, because the connection is already encrypted.
ClientMode::Websockets { .. } => None,
}
}
}
#[derive(Debug, Error)]
// almost all errors should be reported to the user, but there's a few cases where we cannot
// 1. Cancellation: we are not allowed to tell the client any cancellation statuses for security reasons
// 2. Handshake: handshake reports errors if it can, otherwise if the handshake fails due to protocol violation,
// we cannot be sure the client even understands our error message
// 3. PrepareClient: The client disconnected, so we can't tell them anyway...
pub(crate) enum ClientRequestError {
#[error("{0}")]
Cancellation(#[from] cancellation::CancelError),
#[error("{0}")]
Handshake(#[from] HandshakeError),
#[error("{0}")]
HandshakeTimeout(#[from] tokio::time::error::Elapsed),
#[error("{0}")]
PrepareClient(#[from] std::io::Error),
#[error("{0}")]
ReportedError(#[from] crate::stream::ReportedError),
}
impl ReportableError for ClientRequestError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ClientRequestError::Cancellation(e) => e.get_error_kind(),
ClientRequestError::Handshake(e) => e.get_error_kind(),
ClientRequestError::HandshakeTimeout(_) => crate::error::ErrorKind::RateLimit,
ClientRequestError::ReportedError(e) => e.get_error_kind(),
ClientRequestError::PrepareClient(_) => crate::error::ErrorKind::ClientDisconnect,
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_connection<S: AsyncRead + AsyncWrite + Unpin + Send>(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
ctx: &RequestContext,
cancellation_handler: Arc<CancellationHandler>,
client: S,
mode: ClientMode,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conn_gauge: NumClientConnectionsGuard<'static>,
cancellations: tokio_util::task::task_tracker::TaskTracker,
) -> Result<Option<ProxyPassthrough<S>>, ClientRequestError> {
debug!(
protocol = %ctx.protocol(),
"handling interactive connection from client"
);
let metrics = &Metrics::get().proxy;
let proto = ctx.protocol();
let request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.load();
let tls = tls.as_deref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, client, mode.handshake_tls(tls), record_handshake_error);
let (mut client, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
.await??
{
HandshakeData::Startup(client, params) => (client, params),
HandshakeData::Cancel(cancel_key_data) => {
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let ctx = ctx.clone();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id());
cancel_span.follows_from(tracing::Span::current());
async move {
cancellation_handler_clone
.cancel_session(
cancel_key_data,
ctx,
config.authentication_config.ip_allowlist_check_enabled,
config.authentication_config.is_vpc_acccess_proxy,
auth_backend.get_api(),
)
.await
.inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok();
}.instrument(cancel_span)
});
return Ok(None);
}
};
drop(pause);
ctx.set_db_options(params.clone());
let common_names = tls.map(|tls| &tls.common_names);
let (node, cancel_on_shutdown) = handle_client(
config,
auth_backend,
ctx,
cancellation_handler,
&mut client,
&mode,
endpoint_rate_limiter,
common_names,
&params,
)
.await?;
let client = client.flush_and_into_inner().await?;
let private_link_id = match ctx.extra() {
Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()),
Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()),
None => None,
};
Ok(Some(ProxyPassthrough {
client,
compute: node.stream,
aux: node.aux,
private_link_id,
_cancel_on_shutdown: cancel_on_shutdown,
_req: request_gauge,
_conn: conn_gauge,
_db_conn: node.guage,
}))
}

View File

@@ -112,7 +112,7 @@ where
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, compute.retry) {
if !should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,

View File

@@ -5,323 +5,64 @@ pub(crate) mod connect_compute;
pub(crate) mod retry;
pub(crate) mod wake_compute;
use std::collections::HashSet;
use std::convert::Infallible;
use std::sync::Arc;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use regex::Regex;
use serde::{Deserialize, Serialize};
use smol_str::{SmolStr, ToSmolStr, format_smolstr};
use thiserror::Error;
use smol_str::{SmolStr, format_smolstr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, warn};
use tokio::sync::oneshot;
use tracing::Instrument;
use crate::cancellation::{self, CancellationHandler};
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::cache::Cache;
use crate::cancellation::CancellationHandler;
use crate::compute::ComputeConnection;
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::control_plane::client::ControlPlaneClient;
pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute};
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::pglb::{ClientMode, ClientRequestError};
use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams};
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::proxy::retry::ShouldRetryWakeCompute;
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::{PqStream, Stream};
use crate::types::EndpointCacheKey;
use crate::util::run_until_cancelled;
use crate::{auth, compute};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
#[derive(Error, Debug)]
#[error("{ERR_INSECURE_CONNECTION}")]
pub struct TlsRequired;
impl ReportableError for TlsRequired {
fn get_error_kind(&self) -> crate::error::ErrorKind {
crate::error::ErrorKind::User
}
}
impl UserFacingError for TlsRequired {}
pub async fn task_main(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandler>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
let conn_gauge = Metrics::get()
.proxy
.client_connections
.guard(crate::metrics::Protocol::Tcp);
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let cancellations = cancellations.clone();
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
connections.spawn(async move {
let (socket, conn_info) = match config.proxy_protocol_v2 {
ProxyProtocolV2::Required => {
match read_proxy_protocol(socket).await {
Err(e) => {
warn!("per-client task finished with an error: {e:#}");
return;
}
// our load balancers will not send any more data. let's just exit immediately
Ok((_socket, ConnectHeader::Local)) => {
debug!("healthcheck received");
return;
}
Ok((socket, ConnectHeader::Proxy(info))) => (socket, info),
}
}
// ignore the header - it cannot be confused for a postgres or http connection so will
// error later.
ProxyProtocolV2::Rejected => (
socket,
ConnectionInfo {
addr: peer_addr,
extra: None,
},
),
};
match socket.set_nodelay(true) {
Ok(()) => {}
Err(e) => {
error!(
"per-client task finished with an error: failed to set socket option: {e:#}"
);
return;
}
}
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
let res = handle_client(
config,
auth_backend,
&ctx,
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter2,
conn_gauge,
cancellations,
)
.instrument(ctx.span())
.boxed()
.await;
match res {
Err(e) => {
ctx.set_error_kind(e.get_error_kind());
warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
}
Ok(None) => {
ctx.set_success();
}
Ok(Some(p)) => {
ctx.set_success();
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
warn!(
?session_id,
"per-client task finished with an IO error from the client: {e:#}"
);
}
Err(ErrorSource::Compute(e)) => {
error!(
?session_id,
"per-client task finished with an IO error from the compute: {e:#}"
);
}
}
}
}
});
}
connections.close();
cancellations.close();
drop(listener);
// Drain connections
connections.wait().await;
cancellations.wait().await;
Ok(())
}
pub(crate) enum ClientMode {
Tcp,
Websockets { hostname: Option<String> },
}
/// Abstracts the logic of handling TCP vs WS clients
impl ClientMode {
pub(crate) fn allow_cleartext(&self) -> bool {
match self {
ClientMode::Tcp => false,
ClientMode::Websockets { .. } => true,
}
}
fn hostname<'a, S>(&'a self, s: &'a Stream<S>) -> Option<&'a str> {
match self {
ClientMode::Tcp => s.sni_hostname(),
ClientMode::Websockets { hostname } => hostname.as_deref(),
}
}
fn handshake_tls<'a>(&self, tls: Option<&'a TlsConfig>) -> Option<&'a TlsConfig> {
match self {
ClientMode::Tcp => tls,
// TLS is None here if using websockets, because the connection is already encrypted.
ClientMode::Websockets { .. } => None,
}
}
}
#[derive(Debug, Error)]
// almost all errors should be reported to the user, but there's a few cases where we cannot
// 1. Cancellation: we are not allowed to tell the client any cancellation statuses for security reasons
// 2. Handshake: handshake reports errors if it can, otherwise if the handshake fails due to protocol violation,
// we cannot be sure the client even understands our error message
// 3. PrepareClient: The client disconnected, so we can't tell them anyway...
pub(crate) enum ClientRequestError {
#[error("{0}")]
Cancellation(#[from] cancellation::CancelError),
#[error("{0}")]
Handshake(#[from] HandshakeError),
#[error("{0}")]
HandshakeTimeout(#[from] tokio::time::error::Elapsed),
#[error("{0}")]
PrepareClient(#[from] std::io::Error),
#[error("{0}")]
ReportedError(#[from] crate::stream::ReportedError),
}
impl ReportableError for ClientRequestError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ClientRequestError::Cancellation(e) => e.get_error_kind(),
ClientRequestError::Handshake(e) => e.get_error_kind(),
ClientRequestError::HandshakeTimeout(_) => crate::error::ErrorKind::RateLimit,
ClientRequestError::ReportedError(e) => e.get_error_kind(),
ClientRequestError::PrepareClient(_) => crate::error::ErrorKind::ClientDisconnect,
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
ctx: &RequestContext,
cancellation_handler: Arc<CancellationHandler>,
stream: S,
mode: ClientMode,
client: &mut PqStream<Stream<S>>,
mode: &ClientMode,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conn_gauge: NumClientConnectionsGuard<'static>,
cancellations: tokio_util::task::task_tracker::TaskTracker,
) -> Result<Option<ProxyPassthrough<S>>, ClientRequestError> {
debug!(
protocol = %ctx.protocol(),
"handling interactive connection from client"
);
let metrics = &Metrics::get().proxy;
let proto = ctx.protocol();
let request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.load();
let tls = tls.as_deref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error);
let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
.await??
{
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let ctx = ctx.clone();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id());
cancel_span.follows_from(tracing::Span::current());
async move {
cancellation_handler_clone
.cancel_session(
cancel_key_data,
ctx,
config.authentication_config.ip_allowlist_check_enabled,
config.authentication_config.is_vpc_acccess_proxy,
auth_backend.get_api(),
)
.await
.inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok();
}.instrument(cancel_span)
});
return Ok(None);
}
};
drop(pause);
ctx.set_db_options(params.clone());
let hostname = mode.hostname(stream.get_ref());
let common_names = tls.map(|tls| &tls.common_names);
common_names: Option<&HashSet<String>>,
params: &StartupMessageParams,
) -> Result<(ComputeConnection, oneshot::Sender<Infallible>), ClientRequestError> {
let hostname = mode.hostname(client.get_ref());
// Extract credentials which we're going to use for auth.
let result = auth_backend
.as_ref()
.map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, &params, hostname, common_names))
.map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, params, hostname, common_names))
.transpose();
let user_info = match result {
Ok(user_info) => user_info,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
Err(e) => Err(client.throw_error(e, Some(ctx)).await)?,
};
let user = user_info.get_user().to_owned();
let user_info = match user_info
.authenticate(
ctx,
&mut stream,
client,
mode.allow_cleartext(),
&config.authentication_config,
endpoint_rate_limiter,
@@ -334,7 +75,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
let app = params.get("application_name");
let params_span = tracing::info_span!("", ?user, ?db, ?app);
return Err(stream
return Err(client
.throw_error(e, Some(ctx))
.instrument(params_span)
.await)?;
@@ -347,37 +88,67 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
};
let params_compat = creds.info.options.get(NeonOptions::PARAMS_COMPAT).is_some();
let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys);
auth_info.set_startup_params(&params, params_compat);
auth_info.set_startup_params(params, params_compat);
let res = connect_to_compute(
ctx,
&TcpMechanism {
locks: &config.connect_compute_locks,
},
&auth::Backend::ControlPlane(cplane, creds.info.clone()),
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.await;
let mut node = match res {
Ok(node) => node,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
let mut node;
let mut attempt = 0;
let connect = TcpMechanism {
locks: &config.connect_compute_locks,
};
let backend = auth::Backend::ControlPlane(cplane, creds.info);
let pg_settings = auth_info.authenticate(ctx, &mut node, creds.info).await;
let pg_settings = match pg_settings {
Ok(pg_settings) => pg_settings,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
// NOTE: This is messy, but should hopefully be detangled with PGLB.
// We wanted to separate the concerns of **connect** to compute (a PGLB operation),
// from **authenticate** to compute (a NeonKeeper operation).
//
// This unfortunately removed retry handling for one error case where
// the compute was cached, and we connected, but the compute cache was actually stale
// and is associated with the wrong endpoint. We detect this when the **authentication** fails.
// As such, we retry once here if the `authenticate` function fails and the error is valid to retry.
let pg_settings = loop {
attempt += 1;
// TODO: callback to pglb
let res = connect_to_compute(
ctx,
&connect,
&backend,
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.await;
match res {
Ok(n) => node = n,
Err(e) => return Err(client.throw_error(e, Some(ctx)).await)?,
}
let auth::Backend::ControlPlane(cplane, user_info) = &backend else {
unreachable!("ensured above");
};
let res = auth_info.authenticate(ctx, &mut node, user_info).await;
match res {
Ok(pg_settings) => break pg_settings,
Err(e) if attempt < 2 && e.should_retry_wake_compute() => {
tracing::warn!(error = ?e, "retrying wake compute");
#[allow(irrefutable_let_patterns)]
if let ControlPlaneClient::ProxyV1(cplane_proxy_v1) = &**cplane {
let key = user_info.endpoint_cache_key();
cplane_proxy_v1.caches.node_info.invalidate(&key);
}
}
Err(e) => Err(client.throw_error(e, Some(ctx)).await)?,
}
};
let session = cancellation_handler.get_key();
prepare_client_connection(&pg_settings, *session.key(), &mut stream);
let stream = stream.flush_and_into_inner().await?;
finish_client_init(&pg_settings, *session.key(), client);
let session_id = ctx.session_id();
let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel();
let (cancel_on_shutdown, cancel) = oneshot::channel();
tokio::spawn(async move {
session
.maintain_cancel_key(
@@ -389,50 +160,32 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.await;
});
let private_link_id = match ctx.extra() {
Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()),
Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()),
None => None,
};
Ok(Some(ProxyPassthrough {
client: stream,
compute: node.stream,
aux: node.aux,
private_link_id,
_cancel_on_shutdown: cancel_on_shutdown,
_req: request_gauge,
_conn: conn_gauge,
_db_conn: node.guage,
}))
Ok((node, cancel_on_shutdown))
}
/// Finish client connection initialization: confirm auth success, send params, etc.
pub(crate) fn prepare_client_connection(
pub(crate) fn finish_client_init(
settings: &compute::PostgresSettings,
cancel_key_data: CancelKeyData,
stream: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) {
// Forward all deferred notices to the client.
for notice in &settings.delayed_notice {
stream.write_raw(notice.as_bytes().len(), b'N', |buf| {
client.write_raw(notice.as_bytes().len(), b'N', |buf| {
buf.extend_from_slice(notice.as_bytes());
});
}
// Forward all postgres connection params to the client.
for (name, value) in &settings.params {
stream.write_message(BeMessage::ParameterStatus {
client.write_message(BeMessage::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),
});
}
stream.write_message(BeMessage::BackendKeyData(cancel_key_data));
stream.write_message(BeMessage::ReadyForQuery);
client.write_message(BeMessage::BackendKeyData(cancel_key_data));
client.write_message(BeMessage::ReadyForQuery);
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
@@ -442,7 +195,7 @@ impl NeonOptions {
// proxy options:
/// `PARAMS_COMPAT` allows opting in to forwarding all startup parameters from client to compute.
const PARAMS_COMPAT: &str = "proxy_params_compat";
pub const PARAMS_COMPAT: &str = "proxy_params_compat";
// cplane options:

View File

@@ -3,7 +3,7 @@ use std::io;
use tokio::time;
use crate::compute;
use crate::compute::{self, PostgresError};
use crate::config::RetryConfig;
pub(crate) trait CouldRetry {
@@ -115,6 +115,14 @@ impl ShouldRetryWakeCompute for compute::ConnectionError {
}
}
impl ShouldRetryWakeCompute for PostgresError {
fn should_retry_wake_compute(&self) -> bool {
match self {
PostgresError::Postgres(error) => error.should_retry_wake_compute(),
}
}
}
pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration {
config
.base_delay

View File

@@ -14,6 +14,9 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
use tokio_util::codec::{Decoder, Encoder};
use super::*;
use crate::config::TlsConfig;
use crate::context::RequestContext;
use crate::pglb::handshake::{HandshakeData, handshake};
enum Intercept {
None,

View File

@@ -3,6 +3,7 @@
mod mitm;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail};
@@ -10,26 +11,31 @@ use async_trait::async_trait;
use http::StatusCode;
use postgres_client::config::SslMode;
use postgres_client::tls::{MakeTlsConnect, NoTls};
use retry::{ShouldRetryWakeCompute, retry_after};
use rstest::rstest;
use rustls::crypto::ring;
use rustls::pki_types;
use tokio::io::DuplexStream;
use tokio::io::{AsyncRead, AsyncWrite, DuplexStream};
use tracing_test::traced_test;
use super::retry::CouldRetry;
use super::*;
use crate::auth::backend::{ComputeUserInfo, MaybeOwned};
use crate::config::{ComputeConfig, RetryConfig};
use crate::config::{ComputeConfig, RetryConfig, TlsConfig};
use crate::context::RequestContext;
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
use crate::control_plane::{self, CachedNodeInfo, NodeInfo, NodeInfoCache};
use crate::error::ErrorKind;
use crate::proxy::connect_compute::ConnectMechanism;
use crate::error::{ErrorKind, ReportableError};
use crate::pglb::ERR_INSECURE_CONNECTION;
use crate::pglb::handshake::{HandshakeData, handshake};
use crate::pqproto::BeMessage;
use crate::proxy::NeonOptions;
use crate::proxy::connect_compute::{ConnectMechanism, connect_to_compute};
use crate::proxy::retry::{ShouldRetryWakeCompute, retry_after};
use crate::stream::{PqStream, Stream};
use crate::tls::client_config::compute_client_config_with_certs;
use crate::tls::server_config::CertResolver;
use crate::types::{BranchId, EndpointId, ProjectId};
use crate::{sasl, scram};
use crate::{auth, compute, sasl, scram};
/// Generate a set of TLS certificates: CA + server.
fn generate_certs(
@@ -374,6 +380,7 @@ fn connect_compute_total_wait() {
#[derive(Clone, Copy, Debug)]
enum ConnectAction {
Wake,
WakeCold,
WakeFail,
WakeRetry,
Connect,
@@ -504,6 +511,9 @@ impl TestControlPlaneClient for TestConnectMechanism {
*counter += 1;
match action {
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
ConnectAction::WakeCold => Ok(CachedNodeInfo::new_uncached(
helper_create_uncached_node_info(),
)),
ConnectAction::WakeFail => {
let err = control_plane::errors::ControlPlaneError::Message(Box::new(
ControlPlaneErrorMessage {
@@ -551,8 +561,8 @@ impl TestControlPlaneClient for TestConnectMechanism {
}
}
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = NodeInfo {
fn helper_create_uncached_node_info() -> NodeInfo {
NodeInfo {
conn_info: compute::ConnectInfo {
host: "test".into(),
port: 5432,
@@ -566,7 +576,11 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
compute_id: "compute".into(),
cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm,
},
};
}
}
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = helper_create_uncached_node_info();
let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone()));
node2.map(|()| node)
}
@@ -742,7 +756,7 @@ async fn fail_no_wake_skips_cache_invalidation() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::FailNoWake,
ConnectAction::RetryNoWake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
@@ -788,7 +802,7 @@ async fn retry_no_wake_skips_invalidation() {
let ctx = RequestContext::test();
// Wake → RetryNoWake (retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]);
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
@@ -802,3 +816,44 @@ async fn retry_no_wake_skips_invalidation() {
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_no_wake_error_fast() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → FailNoWake (not retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, FailNoWake]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap_err();
mechanism.verify();
// Because FailNoWake has wakeable=false, we must NOT see invalidate_cache
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_cold_wake_skips_invalidation() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// WakeCold → FailNoWake (not retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![WakeCold, Retry, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap();
mechanism.verify();
}

View File

@@ -17,7 +17,8 @@ use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::Metrics;
use crate::proxy::{ClientMode, ErrorSource, handle_client};
use crate::pglb::{ClientMode, handle_connection};
use crate::proxy::ErrorSource;
use crate::rate_limiter::EndpointRateLimiter;
pin_project! {
@@ -142,7 +143,7 @@ pub(crate) async fn serve_websocket(
.client_connections
.guard(crate::metrics::Protocol::Ws);
let res = Box::pin(handle_client(
let res = Box::pin(handle_connection(
config,
auth_backend,
&ctx,

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.87.0"
channel = "1.88.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -62,7 +62,8 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
ps.allowed_errors.extend(
[
# We shut down pageservers while they might have some compaction work going on
".*Compaction failed.*shutting down.*"
".*Compaction failed.*shutting down.*",
".*flush task cancelled.*",
]
)

View File

@@ -896,6 +896,134 @@ def test_timeline_retain_lsn(
assert sum == pre_branch_sum
def test_timeline_offload_delete_race(neon_env_builder: NeonEnvBuilder):
"""
Regression test for https://github.com/neondatabase/cloud/issues/30406
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
# Turn off gc and compaction loops: we want to issue them manually for better reliability
tenant_id, root_timeline_id = env.create_tenant(
conf={
"gc_period": "0s",
"compaction_period": "0s",
"checkpoint_distance": f"{1024**2}",
}
)
origin_ps = env.get_tenant_pageserver(tenant_id)
assert origin_ps
origin_ps.allowed_errors.extend(
[
".*Timed out waiting for deletion queue flush.*",
".*Timed out waiting for flush to remote storage.*",
]
)
origin_ps_http = origin_ps.http_client()
# We are not sharding this tenant
tenant_shard_id = TenantShardId(tenant_id, 0, 0)
# Create a branch and archive it
child_timeline_id = env.create_branch("test_archived_branch_persisted", tenant_id)
with env.endpoints.create_start(
"test_archived_branch_persisted", tenant_id=tenant_id
) as endpoint:
endpoint.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'data_content')",
"INSERT INTO foo SELECT FROM generate_series(1,512)",
]
)
last_flush_lsn_upload(env, endpoint, tenant_id, child_timeline_id)
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/",
)
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/tenant-manifest",
)
origin_ps_http.timeline_archival_config(
tenant_id,
child_timeline_id,
state=TimelineArchivalState.ARCHIVED,
)
def timeline_offloaded_api(timeline_id: TimelineId) -> bool:
return any(
timeline["timeline_id"] == str(timeline_id)
for timeline in origin_ps_http.timeline_and_offloaded_list(
tenant_id=tenant_id
).offloaded
)
def child_offloaded():
origin_ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=child_timeline_id)
assert timeline_offloaded_api(child_timeline_id)
wait_until(child_offloaded)
# Delete the timeline from the origin pageserver, holding up the deletion queue so that it doesn't finish
failpoint_deletion_queue = "deletion-queue-before-execute-pause"
origin_ps_http.configure_failpoints((failpoint_deletion_queue, "pause"))
origin_ps_http.timeline_delete(tenant_id, child_timeline_id)
dest_ps = [ps for ps in env.pageservers if ps.id != origin_ps.id][0]
assert dest_ps
log.info(f"Migrating {tenant_id} {origin_ps.id}->{dest_ps.id}")
env.storage_controller.tenant_shard_migrate(tenant_shard_id, dest_ps_id=dest_ps.id)
log.info("unstuck the DELETE")
origin_ps_http.configure_failpoints((failpoint_deletion_queue, "off"))
def child_prefix_empty():
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/{str(child_timeline_id)}/",
)
wait_until(child_prefix_empty)
dest_ps_http = dest_ps.http_client()
# We can't use timeline_delete_wait_completed here as timeline status will return 404, but we want to return 404 from the deletion endpoint
def timeline_is_missing():
data = None
try:
data = dest_ps_http.timeline_delete(tenant_id, child_timeline_id)
log.info(f"timeline delete {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code == 404:
return
raise RuntimeError(f"Timeline exists {data}")
wait_until(timeline_is_missing)
# (dest_ps_http, tenant_id, child_timeline_id)
#
# Now ensure that scrubber doesn't have anything to clean up.
#
# Sleep some amount larger than min_age_secs
time.sleep(3)
# Ensure that min_age_secs has a deletion impeding effect
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
assert gc_summary["tenant_manifests_deleted"] == 0
def test_timeline_offload_generations(neon_env_builder: NeonEnvBuilder):
"""
Test for scrubber deleting old generations of manifests

View File

@@ -68,7 +68,6 @@ num-integer = { version = "0.1", features = ["i128"] }
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
@@ -97,7 +96,7 @@ time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
tracing = { version = "0.1", features = ["log"] }
@@ -134,7 +133,6 @@ num-integer = { version = "0.1", features = ["i128"] }
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prettyplease = { version = "0.2", default-features = false, features = ["verbatim"] }
proc-macro2 = { version = "1" }