Compare commits

..

3 Commits

Author SHA1 Message Date
Tristan Partin
bbd646325c Add match arm for unused builtin resource managers
Although we don't currently handle these, they are much different from
an unrecognized resource manager, which the comment in the last match
arm refers to.
2024-09-16 10:47:43 -05:00
Tristan Partin
5e71d8fddc Use pg_upgrade to upgrade projects from one Postgres major version to the next 2024-09-12 12:51:28 +01:00
Tristan Partin
3d07b6a483 Use the async Postgres client for compute_ctl
Necessary for continued development of the pg_upgrade work.
2024-09-12 11:55:39 +01:00
83 changed files with 1789 additions and 3373 deletions

View File

@@ -62,7 +62,7 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16 17; do
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
@@ -83,10 +83,6 @@ jobs:
id: pg_v16_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
- name: Set pg 17 revision for caching
id: pg_v17_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) >> $GITHUB_OUTPUT
# Set some environment variables used by all the steps.
#
# CARGO_FLAGS is extra options to pass to "cargo build", "cargo test" etc.
@@ -140,13 +136,6 @@ jobs:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Cache postgres v17 build
id: cache_pg_17
uses: actions/cache@v4
with:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make postgres-v14 -j$(nproc)
@@ -159,10 +148,6 @@ jobs:
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make postgres-v16 -j$(nproc)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: mold -run make postgres-v17 -j$(nproc)
- name: Build neon extensions
run: mold -run make neon-pg-ext -j$(nproc)
@@ -225,7 +210,7 @@ jobs:
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
LD_LIBRARY_PATH=$(pwd)/pg_install/v17/lib
LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib
export LD_LIBRARY_PATH
#nextest does not yet support running doctests

View File

@@ -211,7 +211,7 @@ jobs:
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
# Run tests on all Postgres versions in release builds and only on the latest version in debug builds
pg-versions: ${{ matrix.build-type == 'release' && '["v14", "v15", "v16", "v17"]' || '["v17"]' }}
pg-versions: ${{ matrix.build-type == 'release' && '["v14", "v15", "v16"]' || '["v16"]' }}
secrets: inherit
# Keep `benchmarks` job outside of `build-and-test-locally` workflow to make job failures non-blocking
@@ -548,7 +548,7 @@ jobs:
strategy:
fail-fast: false
matrix:
version: [ v14, v15, v16, v17 ]
version: [ v14, v15, v16 ]
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
@@ -627,7 +627,7 @@ jobs:
- name: Build compute-tools image
# compute-tools are Postgres independent, so build it only once
if: matrix.version == 'v17'
if: matrix.version == 'v16'
uses: docker/build-push-action@v6
with:
target: compute-tools-image
@@ -649,7 +649,7 @@ jobs:
strategy:
matrix:
version: [ v14, v15, v16, v17 ]
version: [ v14, v15, v16 ]
steps:
- uses: docker/login-action@v3
@@ -671,7 +671,7 @@ jobs:
neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-arm64
- name: Create multi-arch compute-tools image
if: matrix.version == 'v17'
if: matrix.version == 'v16'
run: |
docker buildx imagetools create -t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-x64 \
@@ -689,7 +689,7 @@ jobs:
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
- name: Push multi-arch compute-tools image to ECR
if: matrix.version == 'v17'
if: matrix.version == 'v16'
run: |
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}
@@ -700,7 +700,7 @@ jobs:
strategy:
fail-fast: false
matrix:
version: [ v14, v15, v16, v17 ]
version: [ v14, v15, v16 ]
env:
VM_BUILDER_VERSION: v0.29.3
@@ -798,7 +798,7 @@ jobs:
runs-on: ubuntu-22.04
env:
VERSIONS: v14 v15 v16 v17
VERSIONS: v14 v15 v16
steps:
- uses: docker/login-action@v3
@@ -839,7 +839,7 @@ jobs:
done
done
docker buildx imagetools create -t neondatabase/neon-test-extensions-v16:latest \
neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}
neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}
- name: Login to prod ECR
uses: docker/login-action@v3
@@ -852,7 +852,7 @@ jobs:
- name: Copy all images to prod ECR
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16,v17}; do
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
docker buildx imagetools create -t 093970136003.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }} \
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
done
@@ -864,7 +864,7 @@ jobs:
with:
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
@@ -876,7 +876,7 @@ jobs:
with:
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
@@ -971,7 +971,7 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16 17; do
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
@@ -1117,7 +1117,6 @@ jobs:
files_to_promote+=("s3://${BUCKET}/${s3_key}")
# TODO Add v17
for pg_version in v14 v15 v16; do
# We run less tests for debug builds, so we don't need to promote them
if [ "${build_type}" == "debug" ] && { [ "${arch}" == "ARM64" ] || [ "${pg_version}" != "v16" ] ; }; then

View File

@@ -72,10 +72,6 @@ jobs:
id: pg_v16_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
- name: Set pg 17 revision for caching
id: pg_v17_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) >> $GITHUB_OUTPUT
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@v4
@@ -97,13 +93,6 @@ jobs:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_17
uses: actions/cache@v4
with:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Set extra env for macOS
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
@@ -131,10 +120,6 @@ jobs:
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: make postgres-v16 -j$(sysctl -n hw.ncpu)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: make postgres-v17 -j$(sysctl -n hw.ncpu)
- name: Build neon extensions
run: make neon-pg-ext -j$(sysctl -n hw.ncpu)
@@ -181,7 +166,7 @@ jobs:
run: make walproposer-lib -j$(nproc)
- name: Produce the build stats
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release --timings -j$(nproc)
run: PQ_LIB_DIR=$(pwd)/pg_install/v16/lib cargo build --all --release --timings -j$(nproc)
- name: Upload the build stats
id: upload-stats

8
.gitmodules vendored
View File

@@ -5,12 +5,8 @@
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon
branch = tristan957/15/pg_upgrade
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = https://github.com/neondatabase/postgres.git
branch = REL_16_STABLE_neon
[submodule "vendor/postgres-v17"]
path = vendor/postgres-v17
url = https://github.com/neondatabase/postgres.git
branch = REL_17_STABLE_neon
branch = tristan957/pg_upgrade

2
Cargo.lock generated
View File

@@ -1220,6 +1220,7 @@ dependencies = [
"anyhow",
"async-compression",
"bytes",
"camino",
"cfg-if",
"chrono",
"clap",
@@ -1237,6 +1238,7 @@ dependencies = [
"reqwest 0.12.4",
"rlimit",
"rust-ini",
"scopeguard",
"serde",
"serde_json",
"signal-hook",

View File

@@ -5,8 +5,6 @@
ARG REPOSITORY=neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG DEFAULT_PG_VERSION=17
ARG STABLE_PG_VERSION=16
# Build Postgres
FROM $REPOSITORY/$IMAGE:$TAG AS pg-build
@@ -15,7 +13,6 @@ WORKDIR /home/nonroot
COPY --chown=nonroot vendor/postgres-v14 vendor/postgres-v14
COPY --chown=nonroot vendor/postgres-v15 vendor/postgres-v15
COPY --chown=nonroot vendor/postgres-v16 vendor/postgres-v16
COPY --chown=nonroot vendor/postgres-v17 vendor/postgres-v17
COPY --chown=nonroot pgxn pgxn
COPY --chown=nonroot Makefile Makefile
COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
@@ -31,19 +28,16 @@ FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG STABLE_PG_VERSION
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& PQ_LIB_DIR=$(pwd)/pg_install/v${STABLE_PG_VERSION}/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment ${ADDITIONAL_RUSTFLAGS}" cargo build \
&& PQ_LIB_DIR=$(pwd)/pg_install/v16/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \
@@ -58,7 +52,6 @@ RUN set -e \
# Build final image
#
FROM debian:bullseye-slim
ARG DEFAULT_PG_VERSION
WORKDIR /data
RUN set -e \
@@ -84,7 +77,6 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubbe
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/
COPY --from=pg-build /home/nonroot/pg_install/v16 /usr/local/v16/
COPY --from=pg-build /home/nonroot/pg_install/v17 /usr/local/v17/
COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
# By default, pageserver uses `.neon/` working directory in WORKDIR, so create one and fill it with the dummy config.
@@ -101,7 +93,7 @@ RUN mkdir -p /data/.neon/ && \
# When running a binary that links with libpq, default to using our most recent postgres version. Binaries
# that want a particular postgres version will select it explicitly: this is just a default.
ENV LD_LIBRARY_PATH=/usr/local/v${DEFAULT_PG_VERSION}/lib
ENV LD_LIBRARY_PATH=/usr/local/v16/lib
VOLUME ["/data"]

View File

@@ -79,7 +79,6 @@ RUN cd postgres && \
#
#########################################################################################
FROM build-deps AS postgis-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y cmake gdal-bin libboost-dev libboost-thread-dev libboost-filesystem-dev \
@@ -88,11 +87,7 @@ RUN apt update && \
protobuf-c-compiler xsltproc
# SFCGAL > 1.3 requires CGAL > 5.2, Bullseye's libcgal-dev is 5.2
RUN case "${PG_VERSION}" in "v17") \
mkdir -p /sfcgal && \
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
esac && \
wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
mkdir sfcgal-src && cd sfcgal-src && tar xzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
cmake -DCMAKE_BUILD_TYPE=Release . && make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -101,10 +96,7 @@ RUN case "${PG_VERSION}" in "v17") \
ENV PATH="/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
esac && \
wget https://download.osgeo.org/postgis/source/postgis-3.3.3.tar.gz -O postgis.tar.gz && \
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.3.tar.gz -O postgis.tar.gz && \
echo "74eb356e3f85f14233791013360881b6748f78081cc688ff9d6f0f673a762d13 postgis.tar.gz" | sha256sum --check && \
mkdir postgis-src && cd postgis-src && tar xzf ../postgis.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
@@ -130,10 +122,7 @@ RUN case "${PG_VERSION}" in "v17") \
cp /usr/local/pgsql/share/extension/address_standardizer.control /extensions/postgis && \
cp /usr/local/pgsql/share/extension/address_standardizer_data_us.control /extensions/postgis
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
echo "cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e pgrouting.tar.gz" | sha256sum --check && \
mkdir pgrouting-src && cd pgrouting-src && tar xzf ../pgrouting.tar.gz --strip-components=1 -C . && \
mkdir build && cd build && \
@@ -153,19 +142,12 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS plv8-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
apt update && \
RUN apt update && \
apt install -y ninja-build python3-dev libncurses5 binutils clang
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.10.tar.gz -O plv8.tar.gz && \
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.10.tar.gz -O plv8.tar.gz && \
echo "7096c3290928561f0d4901b7a52794295dc47f6303102fae3f8e42dd575ad97d plv8.tar.gz" | sha256sum --check && \
mkdir plv8-src && cd plv8-src && tar xzf ../plv8.tar.gz --strip-components=1 -C . && \
# generate and copy upgrade scripts
@@ -190,13 +172,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS h3-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
case "$(uname -m)" in \
RUN case "$(uname -m)" in \
"x86_64") \
export CMAKE_CHECKSUM=739d372726cb23129d57a539ce1432453448816e345e1545f6127296926b6754 \
;; \
@@ -214,11 +192,7 @@ RUN case "${PG_VERSION}" in "v17") \
&& /tmp/cmake-install.sh --skip-license --prefix=/usr/local/ \
&& rm /tmp/cmake-install.sh
RUN case "${PG_VERSION}" in "v17") \
mkdir -p /h3/usr/ && \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
mkdir h3-src && cd h3-src && tar xzf ../h3.tar.gz --strip-components=1 -C . && \
mkdir build && cd build && \
@@ -228,10 +202,7 @@ RUN case "${PG_VERSION}" in "v17") \
cp -R /h3/usr / && \
rm -rf build
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
echo "5c17f09a820859ffe949f847bebf1be98511fb8f1bd86f94932512c00479e324 h3-pg.tar.gz" | sha256sum --check && \
mkdir h3-pg-src && cd h3-pg-src && tar xzf ../h3-pg.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
@@ -247,13 +218,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS unit-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
echo "411d05beeb97e5a4abf17572bfcfbb5a68d98d1018918feff995f6ee3bb03e79 postgresql-unit.tar.gz" | sha256sum --check && \
mkdir postgresql-unit-src && cd postgresql-unit-src && tar xzf ../postgresql-unit.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -272,7 +239,6 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS vector-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY patches/pgvector.patch /pgvector.patch
@@ -280,10 +246,7 @@ COPY patches/pgvector.patch /pgvector.patch
# By default, pgvector Makefile uses `-march=native`. We don't want that,
# because we build the images on different machines than where we run them.
# Pass OPTFLAGS="" to remove it.
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
patch -p1 < /pgvector.patch && \
@@ -298,14 +261,10 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pgjwt-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# 9742dab1b2f297ad3811120db7b21451bca2d3c9 made on 13/11/2021
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
echo "cfdefb15007286f67d3d45510f04a6a7a495004be5b3aecb12cda667e774203f pgjwt.tar.gz" | sha256sum --check && \
mkdir pgjwt-src && cd pgjwt-src && tar xzf ../pgjwt.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -318,13 +277,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS hypopg-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.4.0.tar.gz -O hypopg.tar.gz && \
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.4.0.tar.gz -O hypopg.tar.gz && \
echo "0821011743083226fc9b813c1f2ef5897a91901b57b6bea85a78e466187c6819 hypopg.tar.gz" | sha256sum --check && \
mkdir hypopg-src && cd hypopg-src && tar xzf ../hypopg.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -338,13 +293,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pg-hashids-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
echo "74576b992d9277c92196dd8d816baa2cc2d8046fe102f3dcd7f3c3febed6822a pg_hashids.tar.gz" | sha256sum --check && \
mkdir pg_hashids-src && cd pg_hashids-src && tar xzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -358,15 +309,11 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS rum-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY patches/rum.patch /rum.patch
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
RUN wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
echo "6ab370532c965568df6210bd844ac6ba649f53055e48243525b0b7e5c4d69a7d rum.tar.gz" | sha256sum --check && \
mkdir rum-src && cd rum-src && tar xzf ../rum.tar.gz --strip-components=1 -C . && \
patch -p1 < /rum.patch && \
@@ -381,13 +328,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pgtap-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/theory/pgtap/archive/refs/tags/v1.2.0.tar.gz -O pgtap.tar.gz && \
RUN wget https://github.com/theory/pgtap/archive/refs/tags/v1.2.0.tar.gz -O pgtap.tar.gz && \
echo "9c7c3de67ea41638e14f06da5da57bac6f5bd03fea05c165a0ec862205a5c052 pgtap.tar.gz" | sha256sum --check && \
mkdir pgtap-src && cd pgtap-src && tar xzf ../pgtap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -401,13 +344,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS ip4r-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
RUN wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
echo "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b ip4r.tar.gz" | sha256sum --check && \
mkdir ip4r-src && cd ip4r-src && tar xzf ../ip4r.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -421,13 +360,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS prefix-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
RUN wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
echo "4342f251432a5f6fb05b8597139d3ccde8dcf87e8ca1498e7ee931ca057a8575 prefix.tar.gz" | sha256sum --check && \
mkdir prefix-src && cd prefix-src && tar xzf ../prefix.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -441,13 +376,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS hll-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
echo "e2f55a6f4c4ab95ee4f1b4a2b73280258c5136b161fe9d059559556079694f0e hll.tar.gz" | sha256sum --check && \
mkdir hll-src && cd hll-src && tar xzf ../hll.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -461,13 +392,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS plpgsql-check-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -486,10 +413,7 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH="/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
case "${PG_VERSION}" in \
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
export TIMESCALEDB_VERSION=2.10.1 \
export TIMESCALEDB_CHECKSUM=6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 \
@@ -522,10 +446,7 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH="/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
case "${PG_VERSION}" in \
RUN case "${PG_VERSION}" in \
"v14") \
export PG_HINT_PLAN_VERSION=14_1_4_1 \
export PG_HINT_PLAN_CHECKSUM=c3501becf70ead27f70626bce80ea401ceac6a77e2083ee5f3ff1f1444ec1ad1 \
@@ -538,9 +459,6 @@ RUN case "${PG_VERSION}" in "v17") \
export PG_HINT_PLAN_VERSION=16_1_6_0 \
export PG_HINT_PLAN_CHECKSUM=fc85a9212e7d2819d4ae4ac75817481101833c3cfa9f0fe1f980984e12347d00 \
;; \
"v17") \
echo "TODO: PG17 pg_hint_plan support" && exit 0 \
;; \
*) \
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \
;; \
@@ -560,14 +478,10 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pg-cron-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.6.0.tar.gz -O pg_cron.tar.gz && \
RUN wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.6.0.tar.gz -O pg_cron.tar.gz && \
echo "383a627867d730222c272bfd25cd5e151c578d73f696d32910c7db8c665cc7db pg_cron.tar.gz" | sha256sum --check && \
mkdir pg_cron-src && cd pg_cron-src && tar xzf ../pg_cron.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -581,13 +495,9 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS rdkit-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
apt-get update && \
RUN apt-get update && \
apt-get install -y \
cmake \
libboost-iostreams1.74-dev \
@@ -597,10 +507,7 @@ RUN case "${PG_VERSION}" in "v17") \
libeigen3-dev
ENV PATH="/usr/local/pgsql/bin/:/usr/local/pgsql/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_3.tar.gz -O rdkit.tar.gz && \
RUN wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_3.tar.gz -O rdkit.tar.gz && \
echo "bdbf9a2e6988526bfeb8c56ce3cdfe2998d60ac289078e2215374288185e8c8d rdkit.tar.gz" | sha256sum --check && \
mkdir rdkit-src && cd rdkit-src && tar xzf ../rdkit.tar.gz --strip-components=1 -C . && \
cmake \
@@ -637,14 +544,10 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pg-uuidv7-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \
RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \
echo "0d0759ab01b7fb23851ecffb0bce27822e1868a4a5819bfd276101c716637a7a pg_uuidv7.tar.gz" | sha256sum --check && \
mkdir pg_uuidv7-src && cd pg_uuidv7-src && tar xzf ../pg_uuidv7.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -658,14 +561,10 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pg-roaringbitmap-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions is not supported yet by pg_roaringbitmap. Quit" && exit 0;; \
esac && \
wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -679,14 +578,10 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pg-semver-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 is not supported yet by pg_semver. Quit" && exit 0;; \
esac && \
wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O pg_semver.tar.gz && \
RUN wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O pg_semver.tar.gz && \
echo "fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 pg_semver.tar.gz" | sha256sum --check && \
mkdir pg_semver-src && cd pg_semver-src && tar xzf ../pg_semver.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -725,14 +620,10 @@ RUN case "${PG_VERSION}" in \
#
#########################################################################################
FROM build-deps AS pg-anon-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
esac && \
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
RUN wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
@@ -750,7 +641,6 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS rust-extensions-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt-get update && \
@@ -761,11 +651,9 @@ ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
ARG PG_VERSION
RUN case "${PG_VERSION}" in "v17") \
echo "v17 is not supported yet by pgrx. Quit" && exit 0;; \
esac && \
curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
@@ -784,10 +672,7 @@ USER root
FROM rust-extensions-build AS pg-jsonschema-pg-build
ARG PG_VERSION
RUN case "${PG_VERSION}" in "v17") \
echo "pg_jsonschema does not yet have a release that supports pg17" && exit 0;; \
esac && \
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.gz -O pg_jsonschema.tar.gz && \
RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.gz -O pg_jsonschema.tar.gz && \
echo "61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 pg_jsonschema.tar.gz" | sha256sum --check && \
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
@@ -809,10 +694,7 @@ RUN case "${PG_VERSION}" in "v17") \
FROM rust-extensions-build AS pg-graphql-pg-build
ARG PG_VERSION
RUN case "${PG_VERSION}" in "v17") \
echo "pg_graphql does not yet have a release that supports pg17 as of now" && exit 0;; \
esac && \
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.7.tar.gz -O pg_graphql.tar.gz && \
RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.7.tar.gz -O pg_graphql.tar.gz && \
echo "2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 pg_graphql.tar.gz" | sha256sum --check && \
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
@@ -832,10 +714,7 @@ FROM rust-extensions-build AS pg-tiktoken-pg-build
ARG PG_VERSION
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
RUN case "${PG_VERSION}" in "v17") \
echo "pg_tiktoken does not have versions, nor support for pg17" && exit 0;; \
esac && \
wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6878884c41a262318.tar.gz -O pg_tiktoken.tar.gz && \
RUN wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6878884c41a262318.tar.gz -O pg_tiktoken.tar.gz && \
echo "e64e55aaa38c259512d3e27c572da22c4637418cf124caba904cd50944e5004e pg_tiktoken.tar.gz" | sha256sum --check && \
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
# TODO update pgrx version in the pg_tiktoken repo and remove this line
@@ -854,10 +733,7 @@ RUN case "${PG_VERSION}" in "v17") \
FROM rust-extensions-build AS pg-pgx-ulid-build
ARG PG_VERSION
RUN case "${PG_VERSION}" in "v17") \
echo "pgx_ulid does not support pg17 as of the latest version (0.1.5)" && exit 0;; \
esac && \
wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.5.tar.gz -O pgx_ulid.tar.gz && \
RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.5.tar.gz -O pgx_ulid.tar.gz && \
echo "9d1659a2da65af0133d5451c454de31b37364e3502087dadf579f790bc8bef17 pgx_ulid.tar.gz" | sha256sum --check && \
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "^0.11.2"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
@@ -872,14 +748,10 @@ RUN case "${PG_VERSION}" in "v17") \
#########################################################################################
FROM build-deps AS wal2json-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "We'll need to update wal2json to 2.6+ for pg17 support" && exit 0;; \
esac && \
wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_5.tar.gz && \
RUN wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_5.tar.gz && \
echo "b516653575541cf221b99cf3f8be9b6821f6dbcfc125675c85f35090f824f00e wal2json_2_5.tar.gz" | sha256sum --check && \
mkdir wal2json-src && cd wal2json-src && tar xzf ../wal2json_2_5.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -892,14 +764,10 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pg-ivm-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "We'll need to update pg_ivm to 1.9+ for pg17 support" && exit 0;; \
esac && \
wget https://github.com/sraoss/pg_ivm/archive/refs/tags/v1.7.tar.gz -O pg_ivm.tar.gz && \
RUN wget https://github.com/sraoss/pg_ivm/archive/refs/tags/v1.7.tar.gz -O pg_ivm.tar.gz && \
echo "ebfde04f99203c7be4b0e873f91104090e2e83e5429c32ac242d00f334224d5e pg_ivm.tar.gz" | sha256sum --check && \
mkdir pg_ivm-src && cd pg_ivm-src && tar xzf ../pg_ivm.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -913,14 +781,10 @@ RUN case "${PG_VERSION}" in "v17") \
#
#########################################################################################
FROM build-deps AS pg-partman-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "pg_partman doesn't support PG17 yet" && exit 0;; \
esac && \
wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.0.1.tar.gz -O pg_partman.tar.gz && \
RUN wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.0.1.tar.gz -O pg_partman.tar.gz && \
echo "75b541733a9659a6c90dbd40fccb904a630a32880a6e3044d0c4c5f4c8a65525 pg_partman.tar.gz" | sha256sum --check && \
mkdir pg_partman-src && cd pg_partman-src && tar xzf ../pg_partman.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -990,8 +854,8 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16" | "v17") \
echo "Skipping HNSW for PostgreSQL ${PG_VERSION}" && exit 0 \
"v16") \
echo "Skipping HNSW for PostgreSQL 16" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
@@ -1035,7 +899,7 @@ FROM neon-pg-ext-build AS postgres-cleanup-layer
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
# Remove binaries from /bin/ that we won't use (or would manually copy & install otherwise)
RUN cd /usr/local/pgsql/bin && rm -f ecpg raster2pgsql shp2pgsql pgtopo_export pgtopo_import pgsql2shp
RUN cd /usr/local/pgsql/bin && rm ecpg raster2pgsql shp2pgsql pgtopo_export pgtopo_import pgsql2shp
# Remove headers that we won't need anymore - we've completed installation of all extensions
RUN rm -r /usr/local/pgsql/include
@@ -1054,10 +918,7 @@ RUN rm /usr/local/pgsql/lib/lib*.a
FROM neon-pg-ext-build AS neon-pg-ext-test
ARG PG_VERSION
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
mkdir /ext-src
RUN mkdir /ext-src
#COPY --from=postgis-build /postgis.tar.gz /ext-src/
#COPY --from=postgis-build /sfcgal/* /usr
@@ -1095,39 +956,18 @@ COPY --from=pg-anon-pg-build /pg_anon.tar.gz /ext-src
COPY patches/pg_anon.patch /ext-src
COPY --from=pg-ivm-build /pg_ivm.tar.gz /ext-src
COPY --from=pg-partman-build /pg_partman.tar.gz /ext-src
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
cd /ext-src/ && for f in *.tar.gz; \
RUN cd /ext-src/ && for f in *.tar.gz; \
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|| exit 1; rm -f $f; done
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
cd /ext-src/rum-src && patch -p1 <../rum.patch
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
RUN cd /ext-src/rum-src && patch -p1 <../rum.patch
# cmake is required for the h3 test
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
apt-get update && apt-get install -y cmake
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
cd /ext-src/pg_hint_plan-src && patch -p1 < /ext-src/pg_hint_plan.patch
RUN apt-get update && apt-get install -y cmake
RUN cd /ext-src/pg_hint_plan-src && patch -p1 < /ext-src/pg_hint_plan.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
patch -p1 </ext-src/pg_anon.patch
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
patch -p1 </ext-src/pg_cron.patch
RUN patch -p1 </ext-src/pg_anon.patch
RUN patch -p1 </ext-src/pg_cron.patch
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute
ENV PGPORT=55433

View File

@@ -119,8 +119,6 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status:
# I'm not sure why it wouldn't work, but this is the only place (apart from
# the "build-all-versions" entry points) where direct mention of PostgreSQL
# versions is used.
.PHONY: postgres-configure-v17
postgres-configure-v17: $(POSTGRES_INSTALL_DIR)/build/v17/config.status
.PHONY: postgres-configure-v16
postgres-configure-v16: $(POSTGRES_INSTALL_DIR)/build/v16/config.status
.PHONY: postgres-configure-v15
@@ -217,31 +215,29 @@ neon-pg-clean-ext-%:
# they depend on openssl and other libraries that are not included in our
# Rust build.
.PHONY: walproposer-lib
walproposer-lib: neon-pg-ext-v17
walproposer-lib: neon-pg-ext-v16
+@echo "Compiling walproposer-lib"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v16/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v16/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v16/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
ifeq ($(UNAME_S),Linux)
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \
pg_strong_random.o
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \
checksum_helper.o \
cryptohash_openssl.o \
pg_crc32c.o \
hmac_openssl.o \
cryptohash_openssl.o \
scram-common.o \
md5_common.o \
parse_manifest.o \
scram-common.o
ifeq ($(UNAME_S),Linux)
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \
pg_crc32c.o
checksum_helper.o
endif
.PHONY: walproposer-lib-clean
walproposer-lib-clean:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v16/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
@@ -249,44 +245,38 @@ walproposer-lib-clean:
neon-pg-ext: \
neon-pg-ext-v14 \
neon-pg-ext-v15 \
neon-pg-ext-v16 \
neon-pg-ext-v17
neon-pg-ext-v16
.PHONY: neon-pg-clean-ext
neon-pg-clean-ext: \
neon-pg-clean-ext-v14 \
neon-pg-clean-ext-v15 \
neon-pg-clean-ext-v16 \
neon-pg-clean-ext-v17
neon-pg-clean-ext-v16
# shorthand to build all Postgres versions
.PHONY: postgres
postgres: \
postgres-v14 \
postgres-v15 \
postgres-v16 \
postgres-v17
postgres-v16
.PHONY: postgres-headers
postgres-headers: \
postgres-headers-v14 \
postgres-headers-v15 \
postgres-headers-v16 \
postgres-headers-v17
postgres-headers-v16
.PHONY: postgres-clean
postgres-clean: \
postgres-clean-v14 \
postgres-clean-v15 \
postgres-clean-v16 \
postgres-clean-v17
postgres-clean-v16
.PHONY: postgres-check
postgres-check: \
postgres-check-v14 \
postgres-check-v15 \
postgres-check-v16 \
postgres-check-v17
postgres-check-v16
# This doesn't remove the effects of 'configure'.
.PHONY: clean
@@ -331,13 +321,13 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list
rm -f pg*.BAK
# Indent pxgn/neon.
.PHONY: neon-pgindent
neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/find_typedef \
INDENT=$(POSTGRES_INSTALL_DIR)/build/v17/src/tools/pg_bsd_indent/pg_bsd_indent \
PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/pgindent/pgindent \
-C $(POSTGRES_INSTALL_DIR)/build/neon-v17 \
.PHONY: pgindent
neon-pgindent: postgres-v16-pg-bsd-indent neon-pg-ext-v16
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v16/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v16/src/tools/find_typedef \
INDENT=$(POSTGRES_INSTALL_DIR)/build/v16/src/tools/pg_bsd_indent/pg_bsd_indent \
PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v16/src/tools/pgindent/pgindent \
-C $(POSTGRES_INSTALL_DIR)/build/neon-v16 \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent

View File

@@ -12,6 +12,7 @@ testing = []
[dependencies]
anyhow.workspace = true
async-compression.workspace = true
camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
@@ -24,6 +25,7 @@ num_cpus.workspace = true
opentelemetry.workspace = true
postgres.workspace = true
regex.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true

View File

@@ -43,7 +43,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use clap::{Arg, ArgAction};
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
@@ -51,13 +51,12 @@ use tracing::{error, info, warn};
use url::Url;
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeSpec;
use compute_api::spec::{ComputeMode, ComputeSpec};
use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -70,13 +69,14 @@ use rlimit::{setrlimit, Resource};
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let (build_tag, clap_args) = init()?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
let (pg_handle, start_pg_result) = {
let (http_handle, (pg_handle, start_pg_result)) = {
// Enter startup tracing context
let _startup_context_guard = startup_context_from_env();
@@ -84,13 +84,61 @@ fn main() -> Result<()> {
let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?;
let compute = Arc::new(ComputeNode {
connstr: Url::parse(cli_args.connstr).context("cannot parse connstr as a URL")?,
pgdata: cli_args.pgdata.to_string(),
pgroot: cli_args.pgroot.to_string(),
pgversion: cli_args.pgversion.to_string(),
http_port: cli_args.http_port,
live_config_allowed: cli_spec.live_config_allowed,
state: Mutex::new({
let mut state = ComputeState::new();
start_postgres(&clap_args, wait_spec_result)?
if let Some(spec) = cli_spec.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
info!("new pspec.spec: {:?}", pspec.spec);
state.pspec = Some(pspec);
}
state
}),
state_changed: Condvar::new(),
ext_remote_storage: cli_args.ext_remote_storage.map(|s| s.to_string()),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag: build_tag.clone(),
});
// If this is a pooled VM, prewarm before starting HTTP server and becoming
// available for binding. Prewarming helps Postgres start quicker later,
// because QEMU will already have its memory allocated from the host, and
// the necessary binaries will already be cached.
if compute.state.lock().unwrap().pspec.is_none() {
compute.prewarm_postgres()?;
}
// Launch http service first, so that we can serve control-plane requests
// while configuration is still in progress.
let http_handle = launch_http_server(cli_args.http_port, &compute)
.expect("cannot launch http endpoint thread");
wait_spec(&compute)?;
(
http_handle,
start_postgres(
compute,
#[cfg(target_os = "linux")]
&clap_args,
cli_args.resize_swap_on_bind,
)
.await?,
)
// Startup is finished, exit the startup tracing span
};
let _ = http_handle.join();
// PostgreSQL is now running, if startup was successful. Wait until it exits.
let wait_pg_result = wait_postgres(pg_handle)?;
@@ -120,11 +168,14 @@ fn init() -> Result<(String, clap::ArgMatches)> {
}
fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
let pgbin_default = "postgres";
let pgbin = matches
.get_one::<String>("pgbin")
let pgroot = matches
.get_one::<String>("pgroot")
.map(|s| s.as_str())
.unwrap_or(pgbin_default);
.expect("pgroot is required");
let pgversion = matches
.get_one::<String>("pgversion")
.map(|s| s.as_str())
.expect("pgversion is required");
let ext_remote_storage = matches
.get_one::<String>("remote-ext-config")
@@ -155,7 +206,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
Ok(ProcessCliResult {
connstr,
pgdata,
pgbin,
pgroot,
pgversion,
ext_remote_storage,
http_port,
spec_json,
@@ -167,7 +219,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
struct ProcessCliResult<'clap> {
connstr: &'clap str,
pgdata: &'clap str,
pgbin: &'clap str,
pgroot: &'clap str,
pgversion: &'clap str,
ext_remote_storage: Option<&'clap str>,
http_port: u16,
spec_json: Option<&'clap String>,
@@ -285,61 +338,8 @@ struct CliSpecParams {
live_config_allowed: bool,
}
fn wait_spec(
build_tag: String,
ProcessCliResult {
connstr,
pgdata,
pgbin,
ext_remote_storage,
resize_swap_on_bind,
http_port,
..
}: ProcessCliResult,
CliSpecParams {
spec,
live_config_allowed,
}: CliSpecParams,
) -> Result<WaitSpecResult> {
let mut new_state = ComputeState::new();
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
info!("new pspec.spec: {:?}", pspec.spec);
new_state.pspec = Some(pspec);
spec_set = true;
} else {
spec_set = false;
}
let compute_node = ComputeNode {
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version(pgbin),
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag,
};
let compute = Arc::new(compute_node);
// If this is a pooled VM, prewarm before starting HTTP server and becoming
// available for binding. Prewarming helps Postgres start quicker later,
// because QEMU will already have its memory allocated from the host, and
// the necessary binaries will already be cached.
if !spec_set {
compute.prewarm_postgres()?;
}
// Launch http service first, so that we can serve control-plane requests
// while configuration is still in progress.
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
if !spec_set {
fn wait_spec(compute: &Arc<ComputeNode>) -> Result<()> {
if compute.state.lock().unwrap().pspec.is_none() {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -369,28 +369,13 @@ fn wait_spec(
launch_lsn_lease_bg_task_for_static(&compute);
Ok(WaitSpecResult {
compute,
http_port,
resize_swap_on_bind,
})
Ok(())
}
struct WaitSpecResult {
async fn start_postgres(
compute: Arc<ComputeNode>,
// passed through from ProcessCliResult
http_port: u16,
#[cfg(target_os = "linux")] matches: &clap::ArgMatches,
resize_swap_on_bind: bool,
}
fn start_postgres(
// need to allow unused because `matches` is only used if target_os = "linux"
#[allow(unused_variables)] matches: &clap::ArgMatches,
WaitSpecResult {
compute,
http_port,
resize_swap_on_bind,
}: WaitSpecResult,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
@@ -441,27 +426,32 @@ fn start_postgres(
}
}
let extension_server_port: u16 = http_port;
compute.prepare_compute().await?;
// Start Postgres
let mut pg = None;
if !prestartup_failed {
pg = match compute.start_compute(extension_server_port) {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:#}", err);
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
// Notify others that Postgres failed to start. In case of configuring the
// empty compute, it's likely that API handler is still waiting for compute
// state change. With this we will notify it that compute is in Failed state,
// so control plane will know about it earlier and record proper error instead
// of timeout.
compute.state_changed.notify_all();
drop(state); // unlock
delay_exit = true;
None
match compute.get_mode() {
ComputeMode::Upgrade => {}
_ => {
// Start Postgres
pg = match compute.start_compute() {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:#}", err);
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
// Notify others that Postgres failed to start. In case of configuring the
// empty compute, it's likely that API handler is still waiting for compute
// state change. With this we will notify it that compute is in Failed state,
// so control plane will know about it earlier and record proper error instead
// of timeout.
compute.state_changed.notify_all();
drop(state); // unlock
delay_exit = true;
None
}
}
}
};
} else {
@@ -686,11 +676,17 @@ fn cli() -> clap::Command {
.required(true),
)
.arg(
Arg::new("pgbin")
.short('b')
.long("pgbin")
.default_value("postgres")
.value_name("POSTGRES_PATH"),
Arg::new("pgroot")
.short('R')
.long("pgroot")
.value_name("POSTGRES_ROOT")
.required(true),
)
.arg(
Arg::new("pgversion")
.long("pgversion")
.value_name("POSTGRES_VERSION")
.required(true),
)
.arg(
Arg::new("spec")
@@ -750,6 +746,11 @@ fn cli() -> clap::Command {
.long("resize-swap-on-bind")
.action(clap::ArgAction::SetTrue),
)
.arg(
Arg::new("no-postgres")
.long("no-postgres")
.action(ArgAction::SetTrue),
)
}
/// When compute_ctl is killed, send also termination signal to sync-safekeepers

View File

@@ -4,7 +4,7 @@ use compute_api::{
};
use futures::Stream;
use postgres::{Client, NoTls};
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
use std::{process::Stdio, result::Result, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
@@ -55,9 +55,7 @@ pub async fn get_database_schema(
compute: &Arc<ComputeNode>,
dbname: &str,
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>>, SchemaDumpError> {
let pgbin = &compute.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");
let pgdump = compute.get_my_pg_binary("pg_dump");
let mut connstr = compute.connstr.clone();
connstr.set_path(dbname);
let mut cmd = Command::new(pgdump)

View File

@@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::BufRead;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::path::Path;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
@@ -13,6 +13,8 @@ use std::thread;
use std::time::Instant;
use anyhow::{Context, Result};
use bytes::{Buf, BufMut};
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
@@ -20,13 +22,15 @@ use futures::StreamExt;
use nix::unistd::Pid;
use postgres::error::SqlState;
use postgres::{Client, NoTls};
use tokio;
use tokio_postgres;
use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::zstd::create_zst_tarball;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use nix::sys::signal::{kill, Signal};
@@ -47,8 +51,9 @@ pub struct ComputeNode {
// Url type maintains proper escaping
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub pgroot: String,
pub pgversion: String,
pub http_port: u16,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
@@ -309,6 +314,13 @@ impl ComputeNode {
self.state.lock().unwrap().status
}
/// Get the mode of this compute.
pub fn get_mode(&self) -> ComputeMode {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().spec.mode
}
// Remove `pgdata` directory and create it again with right permissions.
fn create_pgdata(&self) -> Result<()> {
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
@@ -320,15 +332,44 @@ impl ComputeNode {
Ok(())
}
/// Get path pointing to requested binary directory.
pub fn get_pg_bindir(&self, version: &str) -> PathBuf {
Path::new(&self.pgroot)
.join(format!("v{}", version))
.join("bin")
}
/// Get path to requested Postgres binary.
pub fn get_pg_binary(&self, version: &str, binary: &str) -> String {
self.get_pg_bindir(version)
.join(binary)
.into_os_string()
.into_string()
.expect(&format!(
"path to {}-{} cannot be represented as UTF-8",
binary, version
))
}
/// Get path to Postgres binary directory of this compute.
pub fn get_my_pg_bindir(&self) -> PathBuf {
self.get_pg_bindir(&self.pgversion)
}
/// Get path to specified Postgres binary of this compute.
pub fn get_my_pg_binary(&self, binary: &str) -> String {
self.get_pg_binary(&self.pgversion, binary)
}
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip_all, fields(%lsn))]
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
async fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Instant::now();
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
let mut config = tokio_postgres::Config::from_str(shard0_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
@@ -340,7 +381,12 @@ impl ComputeNode {
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let basebackup_cmd = match lsn {
@@ -352,8 +398,18 @@ impl ComputeNode {
),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
let mut measured_reader = MeasuredReader::new(copyreader);
let mut copystream = std::pin::pin!(client.copy_out(basebackup_cmd.as_str()).await?);
let mut buf = bytes::BytesMut::with_capacity(1024);
while let Some(i) = copystream.next().await {
match i {
Ok(b) => {
buf.put(b);
}
Err(e) => return Err(anyhow::anyhow!(e)),
}
}
let basebackup_bytes = buf.len();
// Check the magic number to see if it's a gzip or not. Even though
// we might explicitly ask for gzip, an old pageserver with no implementation
@@ -366,11 +422,9 @@ impl ComputeNode {
// and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
// we send the "global" directory first from the pageserver, so it definitely
// won't be recognized as gzip.
let mut bufreader = std::io::BufReader::new(&mut measured_reader);
let gzip = {
let peek = bufreader.fill_buf().unwrap();
peek[0] == 0x1f && peek[1] == 0x8b
};
let gzip = buf[0] == 0x1f && buf[1] == 0x8b;
let mut bufreader = buf.reader();
// Read the archive directly from the `CopyOutReader`
//
@@ -390,14 +444,14 @@ impl ComputeNode {
// Report metrics
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
state.metrics.basebackup_bytes = basebackup_bytes as u64;
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
Ok(())
}
// Gets the basebackup in a retry loop
#[instrument(skip_all, fields(%lsn))]
pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
pub async fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let mut retry_period_ms = 500.0;
let mut attempts = 0;
const DEFAULT_ATTEMPTS: u16 = 10;
@@ -410,7 +464,7 @@ impl ComputeNode {
#[cfg(not(feature = "testing"))]
let max_attempts = DEFAULT_ATTEMPTS;
loop {
let result = self.try_get_basebackup(compute_state, lsn);
let result = self.try_get_basebackup(compute_state, lsn).await;
match result {
Ok(_) => {
return result;
@@ -431,10 +485,14 @@ impl ComputeNode {
}
}
pub async fn check_safekeepers_synced_async(
// Fast path for sync_safekeepers. If they're already synced we get the lsn
// in one roundtrip. If not, we should do a full sync_safekeepers.
pub async fn check_safekeepers_synced(
&self,
compute_state: &ComputeState,
) -> Result<Option<Lsn>> {
let start_time = Utc::now();
// Construct a connection config for each safekeeper
let pspec: ParsedSpec = compute_state
.pspec
@@ -503,20 +561,7 @@ impl ComputeNode {
return Ok(None);
}
Ok(check_if_synced(responses))
}
// Fast path for sync_safekeepers. If they're already synced we get the lsn
// in one roundtrip. If not, we should do a full sync_safekeepers.
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
let start_time = Utc::now();
// Run actual work with new tokio runtime
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
let lsn = check_if_synced(responses);
// Record runtime
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
@@ -524,7 +569,8 @@ impl ComputeNode {
.to_std()
.unwrap()
.as_millis() as u64;
result
Ok(lsn)
}
// Run `postgres` in a special mode with `--sync-safekeepers` argument
@@ -533,7 +579,8 @@ impl ComputeNode {
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let mut sync_handle = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut sync_handle = maybe_cgexec(&postgres)
.args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &storage_auth_token {
@@ -589,11 +636,7 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
pub fn prepare_pgdata(
&self,
compute_state: &ComputeState,
extension_server_port: u16,
) -> Result<()> {
pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
@@ -603,7 +646,7 @@ impl ComputeNode {
config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"),
&pspec.spec,
Some(extension_server_port),
Some(self.http_port),
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -612,7 +655,8 @@ impl ComputeNode {
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("checking if safekeepers are synced");
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state).await
{
lsn
} else {
info!("starting safekeepers syncing");
@@ -630,18 +674,24 @@ impl ComputeNode {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
ComputeMode::Upgrade => {
info!("Starting upgrade node at latest Pageserver LSN");
Lsn(0)
}
};
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
)
})?;
self.get_basebackup(compute_state, lsn)
.await
.with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
)
})?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
@@ -689,7 +739,7 @@ impl ComputeNode {
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Primary | ComputeMode::Upgrade => {}
ComputeMode::Replica | ComputeMode::Static(..) => {
add_standby_signal(pgdata_path)?;
}
@@ -708,7 +758,7 @@ impl ComputeNode {
// Run initdb to completion
info!("running initdb");
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
let initdb_bin = self.get_my_pg_binary("initdb");
Command::new(initdb_bin)
.args(["-D", pgdata])
.output()
@@ -724,7 +774,8 @@ impl ComputeNode {
// Start postgres
info!("starting postgres");
let mut pg = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut pg = maybe_cgexec(&postgres)
.args(["-D", pgdata])
.spawn()
.expect("cannot start postgres process");
@@ -757,7 +808,8 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut pg = maybe_cgexec(&postgres)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
@@ -778,6 +830,152 @@ impl ComputeNode {
Ok((pg, logs_handle))
}
pub async fn upgrade(&self, pg_version: &str) -> Result<()> {
let old_bindir = self.get_my_pg_bindir();
let new_bindir = self.get_pg_bindir(pg_version);
let old_datadir = Utf8Path::new(&self.pgdata);
let parent_dir = old_datadir.parent().unwrap();
let new_datadir = parent_dir.join("new-pgdata");
// Delete the new data directory before attempting, just in case it exists
let _ = std::fs::remove_dir_all(&new_datadir);
// Step 1: Create new cluster
info!(
"Running initdb to start a cluster upgrade from v{} to v{}",
self.pgversion, pg_version
);
let initdb_bin = self.get_pg_binary(pg_version, "initdb");
let mut initdb_cmd = Command::new(&initdb_bin);
initdb_cmd
.args(["--pgdata", new_datadir.as_str()])
.args(["--username", "cloud_admin"])
.args(["--encoding", "utf8"])
.args(["--auth-local", "trust"])
.env_clear()
.stdout(Stdio::inherit())
.stderr(Stdio::inherit());
match initdb_cmd.status() {
Ok(status) => {
if !status.success() {
return Err(anyhow::anyhow!("failed to initialize the new database"));
}
info!("Initialized v{} database", pg_version);
}
Err(_) => {
return Err(anyhow::anyhow!(
"failed to spawn initdb for the new database"
))
}
};
// Step 2: Run pg_upgrade
info!(
"Running pg_upgrade to upgrade from v{} to v{}",
self.pgversion, pg_version
);
let pg_upgrade_bin = self.get_pg_binary(pg_version, "pg_upgrade");
let mut pg_upgrade_cmd = Command::new(&pg_upgrade_bin);
let mut child = pg_upgrade_cmd
.args([
"--old-bindir",
&old_bindir.into_os_string().into_string().unwrap(),
])
.args(["--old-datadir", old_datadir.as_str()])
.args(["--old-options", "-c neon.safekeepers=''"])
.args([
"--new-bindir",
&new_bindir.into_os_string().into_string().unwrap(),
])
.args(["--new-datadir", new_datadir.as_str()])
.args(["--new-options", "-c neon.safekeepers=''"])
.args(["--username", "cloud_admin"])
.arg("--no-transfer")
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
let status = child.wait()?;
if status.success() {
info!("pg_upgrade was successful");
} else {
return Err(anyhow::anyhow!(
"pg_upgrade failed with exit code {}",
status.code().unwrap()
));
}
/* Step 3: Delete the script that pg_upgrade generates, which is created in the current
* working directory
*/
// TODO: We should write a patch for upstream to not generate this file
if cfg!(windows) {
let _ = std::fs::remove_file("delete_old_cluster.bat");
} else {
let _ = std::fs::remove_file("delete_old_cluster.sh");
}
/* Step 4: Re-prepare the pgdata directory to work with the latest basebackup from the
* pageserver
*/
{
let state = self.state.lock().unwrap().clone();
self.prepare_pgdata(&state).await?;
}
// Step 5: Create tarball minus things like pg_dynshm, etc.
info!("Creating tarball of upgraded initdb directory, minus some files");
let initdb_tar_path = parent_dir.join("initdb.tar.zst");
let _ = std::fs::remove_file(&initdb_tar_path);
create_zst_tarball(&new_datadir, &initdb_tar_path).await?;
// Step 6: Write new postgresql.conf file for upgraded initdb
std::fs::copy(
old_datadir.join("postgresql.conf"),
new_datadir.join("postgresql.conf"),
)?;
// Step 7: Write the tarball into the Postgres WAL
info!("Writing {} to WAL", initdb_tar_path.as_str());
let postgres_bin = self.get_my_pg_binary("postgres");
let mut wal_log_cmd = Command::new(&postgres_bin);
child = wal_log_cmd
.args(["--wal-log", initdb_tar_path.as_str()])
.env_clear()
.env("PGDATA", old_datadir.as_str())
.env("NEON_PURPOSE", "upgrade")
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.expect("postgres --wal-log failed to start");
match child.wait() {
Ok(s) => {
if !s.success() {
return Err(anyhow::anyhow!("Could not WAL log upgrade tarball"));
}
}
Err(e) => return Err(e.into()),
};
// Step 8: Sync the safekeepers to push WAL record to Neon
self.sync_safekeepers(None)?;
/* Note that whether any errors occur after this are unimportant. ALWAYS return success
* after this point. The compute will be terminated immediately after the upgrade. Remember
* that this is an upgrade-only compute, and it will not accept connections from users.
*/
Ok(())
}
/// Do post configuration of the already started Postgres. This function spawns a background thread to
/// configure the database after applying the compute spec. Currently, it upgrades the neon extension
/// version. In the future, it may upgrade all 3rd-party extensions.
@@ -895,8 +1093,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
let pgctl_bin = self.get_my_pg_binary("pg_ctl");
Command::new(&pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
@@ -980,43 +1178,25 @@ impl ComputeNode {
Ok(())
}
/// Prepares the compute for Postgres operations, including downloading
/// remote extensions and preparing the pgdata directory.
///
/// The caller must hold the state mutex.
#[instrument(skip_all)]
pub fn start_compute(
&self,
extension_server_port: u16,
) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
pub async fn prepare_compute(&self) -> Result<()> {
let state = self.state.lock().unwrap().clone();
let pspec = state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
"preparing compute for project {}, operation {}, tenant {}, timeline {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
pspec.tenant_id,
pspec.timeline_id,
);
// tune pgbouncer
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
// Spawn a thread to do the tuning,
// so that we don't block the main thread that starts Postgres.
let pgbouncer_settings = pgbouncer_settings.clone();
let _handle = thread::spawn(move || {
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
if let Err(err) = res {
error!("error while tuning pgbouncer: {err:?}");
}
});
}
info!(
"start_compute spec.remote_extensions {:?}",
"prepare_compute spec.remote_extensions {:?}",
pspec.spec.remote_extensions
);
@@ -1024,7 +1204,8 @@ impl ComputeNode {
// remote shared_preload_libraries before postgres start (if any)
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.pgbin);
let postgres_bin = self.get_my_pg_binary("postgres");
extension_server::create_control_files(remote_extensions, &postgres_bin);
let library_load_start_time = Utc::now();
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
@@ -1046,25 +1227,42 @@ impl ComputeNode {
info!("{:?}", remote_ext_metrics);
}
self.prepare_pgdata(&compute_state, extension_server_port)?;
self.prepare_pgdata(&state).await?;
self.set_status(ComputeStatus::Prepared);
Ok(())
}
#[instrument(skip_all)]
pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
let pgdata_path = Path::new(&self.pgdata);
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
if pspec.spec.mode == ComputeMode::Primary {
if !pspec.spec.skip_pg_catalog_updates {
let pgdata_path = Path::new(&self.pgdata);
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::with_compute_ctl_tmp_override(
pgdata_path,
"neon.max_cluster_size=-1",
|| {
self.pg_reload_conf()?;
self.apply_config(&compute_state)?;
Ok(())
},
)?;
self.pg_reload_conf()?;
self.apply_config(&compute_state)?;
Ok(())
})?;
self.pg_reload_conf()?;
}
self.post_apply_config()?;
}
let startup_end_time = Utc::now();
@@ -1145,9 +1343,11 @@ impl ComputeNode {
core_path.display()
);
let postgres_bin = self.get_my_pg_binary("postgres");
// Try first with gdb
let backtrace = Command::new("gdb")
.args(["--batch", "-q", "-ex", "bt", &self.pgbin])
.args(["--batch", "-q", "-ex", "bt", &postgres_bin])
.arg(&core_path)
.output();
@@ -1280,11 +1480,12 @@ LIMIT 100",
// then we try to download it here
info!("downloading new extension {ext_archive_name}");
let postgres_bin = self.get_my_pg_binary("postgres");
let download_size = extension_server::download_extension(
&real_ext_name,
&ext_path,
ext_remote_storage,
&self.pgbin,
&postgres_bin,
)
.await
.map_err(DownloadError::Other);

View File

@@ -74,7 +74,7 @@ pub fn write_postgres_conf(
}
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Primary | ComputeMode::Upgrade => {}
ComputeMode::Static(lsn) => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;

View File

@@ -75,7 +75,6 @@ use anyhow::Result;
use anyhow::{bail, Context};
use bytes::Bytes;
use compute_api::spec::RemoteExtSpec;
use regex::Regex;
use remote_storage::*;
use reqwest::StatusCode;
use std::path::Path;
@@ -103,35 +102,6 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
.to_string()
}
pub fn get_pg_version(pgbin: &str) -> String {
// pg_config --version returns a (platform specific) human readable string
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
let human_version = get_pg_config("--version", pgbin);
return parse_pg_version(&human_version).to_string();
}
fn parse_pg_version(human_version: &str) -> &str {
// Normal releases have version strings like "PostgreSQL 15.4". But there
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
// configure option, you can tack any string to the version number,
// e.g. "PostgreSQL 15.4foobar".
match Regex::new(r"^PostgreSQL (?<major>\d+).+")
.unwrap()
.captures(human_version)
{
Some(captures) if captures.len() == 2 => match &captures["major"] {
"14" => return "v14",
"15" => return "v15",
"16" => return "v16",
"17" => return "v17",
_ => {}
},
_ => {}
}
panic!("Unsuported postgres version {human_version}");
}
// download the archive for a given extension,
// unzip it, and place files in the appropriate locations (share/lib)
pub async fn download_extension(
@@ -256,42 +226,3 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res
),
}
}
#[cfg(test)]
mod tests {
use super::parse_pg_version;
#[test]
fn test_parse_pg_version() {
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
assert_eq!(
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
"v15"
);
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
assert_eq!(
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
"v14"
);
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
}
#[test]
#[should_panic]
fn test_parse_pg_unsupported_version() {
parse_pg_version("PostgreSQL 13.14");
}
#[test]
#[should_panic]
fn test_parse_pg_incorrect_version_format() {
parse_pg_version("PostgreSQL 14");
}
}

View File

@@ -10,6 +10,7 @@ use crate::catalog::{get_database_schema, get_dbs_and_roles};
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::requests::UpgradeRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use anyhow::Result;
@@ -137,6 +138,20 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::POST, "/upgrade") => {
info!("serving /upgrade POST request");
match handle_upgrade_request(req, compute).await {
Ok(_) => Response::builder()
.status(StatusCode::ACCEPTED)
.body(Body::from("Starting upgrade"))
.unwrap(),
Err((e, status)) => {
error!("error handling /upgrade request: {e}");
render_json_error(&format!("{}", e), status)
}
}
}
(&Method::GET, "/dbs_and_roles") => {
info!("serving /dbs_and_roles GET request",);
match get_dbs_and_roles(compute).await {
@@ -397,6 +412,59 @@ async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (Str
Ok(())
}
async fn handle_upgrade_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
) -> Result<(), (anyhow::Error, StatusCode)> {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let body_str = String::from_utf8(body_bytes.to_vec()).unwrap();
let body = match serde_json::from_str::<UpgradeRequest>(&body_str) {
Ok(r) => r,
Err(e) => return Err((Into::into(e), StatusCode::BAD_REQUEST)),
};
// No sense in trying to upgrade to the same version.
let curr_version = compute.pgversion.clone();
let new_version = body.pg_version;
if curr_version == new_version {
return Err((
anyhow::anyhow!("cannot upgrade endpoint to the same version"),
StatusCode::UNPROCESSABLE_ENTITY,
));
}
// Check that we are in the running state before trying to upgrade.
match compute.get_status() {
ComputeStatus::Prepared => (),
ComputeStatus::Upgrading => {
return Err((
anyhow::anyhow!("upgrade already in progress"),
StatusCode::CONFLICT,
));
}
_ => {
return Err((
anyhow::anyhow!("expected compute to be in the prepared state"),
StatusCode::CONFLICT,
));
}
}
compute.set_status(ComputeStatus::Upgrading);
let c = compute.clone();
tokio::spawn(async move {
if let Err(e) = c.upgrade(&new_version).await {
error!("Failed to upgrade database: {}", e);
}
c.set_status(ComputeStatus::Running);
});
Ok(())
}
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(port: u16, state: Arc<ComputeNode>) {

View File

@@ -212,6 +212,21 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/upgrade:
post:
tags:
- Upgrade
summary: Upgrade project to another Postgres major version.
operationId: upgradePostgres
responses:
202:
description: Upgrade request in progress.
409:
description: Upgrade already in progress.
422:
description: Upgrade request could not be processed.
500:
description: Upgrade request failed.
/terminate:
post:

View File

@@ -798,14 +798,19 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let upgrade_only = sub_args
.get_one::<bool>("upgrade-only")
.copied()
.unwrap_or(false);
let allow_multiple = sub_args.get_flag("allow-multiple");
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
(None, false) => ComputeMode::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
let mode = match (lsn, hot_standby, upgrade_only) {
(Some(lsn), false, false) => ComputeMode::Static(lsn),
(None, true, false) => ComputeMode::Replica,
(None, false, false) => ComputeMode::Primary,
(None, false, true) => ComputeMode::Upgrade,
// Seeing this message means we aren't setting conflicts_with on clap arguments.
_ => anyhow::bail!("Invalid command line invocation"),
};
match (mode, hot_standby) {
@@ -1501,7 +1506,8 @@ fn cli() -> Command {
let lsn_arg = Arg::new("lsn")
.long("lsn")
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false);
.required(false)
.conflicts_with("hot-standby");
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
@@ -1718,6 +1724,18 @@ fn cli() -> Command {
.arg(hot_standby_arg.clone())
.arg(update_catalog)
.arg(allow_multiple.clone())
.arg(
Arg::new("upgrade-only")
.help("Mark this compute as an upgrade compute")
.long("upgrade-only")
.action(ArgAction::SetTrue)
.conflicts_with_all(&[
"config-only",
"hot-standby",
// Perhaps we could offer upgrades at a specific LSN in the future.
"lsn",
])
)
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")

View File

@@ -182,6 +182,7 @@ impl ComputeControlPlane {
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<()> {
// TODO: It really feels like I need to do some protection here
if matches!(mode, ComputeMode::Primary) {
// this check is not complete, as you could have a concurrent attempt at
// creating another primary, both reading the state before checking it here,
@@ -393,6 +394,7 @@ impl Endpoint {
conf.append("recovery_prefetch", "off");
}
}
ComputeMode::Upgrade => {}
}
Ok(conf)
@@ -502,7 +504,7 @@ impl Endpoint {
/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
if matches!(self.mode, ComputeMode::Primary | ComputeMode::Upgrade) {
for sk_id in sk_ids {
let sk = self
.env
@@ -624,13 +626,16 @@ impl Endpoint {
self.endpoint_path().join("spec.json").to_str().unwrap(),
])
.args([
"--pgbin",
self.env
.pg_bin_dir(self.pg_version)?
.join("postgres")
.to_str()
"--pgroot",
&self
.env
.pg_distrib_dir
.clone()
.into_os_string()
.into_string()
.unwrap(),
])
.args(["--pgversion", &self.pg_version.to_string()])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
@@ -674,7 +679,7 @@ impl Endpoint {
}
// keep retrying
}
ComputeStatus::Running => {
ComputeStatus::Running | ComputeStatus::Prepared => {
// All good!
break;
}
@@ -688,6 +693,7 @@ impl Endpoint {
);
}
ComputeStatus::Empty
| ComputeStatus::Upgrading
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPending

View File

@@ -342,7 +342,7 @@ impl LocalEnv {
#[allow(clippy::manual_range_patterns)]
match pg_version {
14 | 15 | 16 | 17 => Ok(path.join(format!("v{pg_version}"))),
14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
}
}

View File

@@ -28,7 +28,6 @@ use utils::{
auth::{encode_from_key_file, Claims, Scope},
id::{NodeId, TenantId},
};
use whoami::username;
pub struct StorageController {
env: LocalEnv,
@@ -184,7 +183,7 @@ impl StorageController {
/// to other versions if that one isn't found. Some automated tests create circumstances
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 16, 15, 14];
let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
for v in prefer_versions {
let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
@@ -212,16 +211,7 @@ impl StorageController {
/// Readiness check for our postgres process
async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result<bool> {
let bin_path = pg_bin_dir.join("pg_isready");
let args = [
"-h",
"localhost",
"-U",
&username(),
"-d",
DB_NAME,
"-p",
&format!("{}", postgres_port),
];
let args = ["-h", "localhost", "-p", &format!("{}", postgres_port)];
let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
Ok(exitcode.success())
@@ -235,11 +225,7 @@ impl StorageController {
///
/// Returns the database url
pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result<String> {
let database_url = format!(
"postgresql://{}@localhost:{}/{DB_NAME}",
&username(),
postgres_port
);
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
let pg_bin_dir = self.get_pg_bin_dir().await?;
let createdb_path = pg_bin_dir.join("createdb");
@@ -249,10 +235,6 @@ impl StorageController {
"localhost",
"-p",
&format!("{}", postgres_port),
"-U",
&username(),
"-O",
&username(),
DB_NAME,
])
.output()
@@ -289,7 +271,7 @@ impl StorageController {
// But tokio-postgres fork doesn't have this upstream commit:
// https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
// => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
.user(&username())
.user(&whoami::username())
.dbname(DB_NAME)
.connect(tokio_postgres::NoTls)
.await
@@ -346,12 +328,6 @@ impl StorageController {
let pg_log_path = pg_data_path.join("postgres.log");
if !tokio::fs::try_exists(&pg_data_path).await? {
let initdb_args = ["-D", pg_data_path.as_ref(), "--username", &username()];
tracing::info!(
"Initializing storage controller database with args: {:?}",
initdb_args
);
// Initialize empty database
let initdb_path = pg_bin_dir.join("initdb");
let mut child = Command::new(&initdb_path)
@@ -359,7 +335,7 @@ impl StorageController {
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
])
.args(initdb_args)
.args(["-D", pg_data_path.as_ref()])
.spawn()
.expect("Failed to spawn initdb");
let status = child.wait().await?;
@@ -388,14 +364,8 @@ impl StorageController {
pg_data_path.as_ref(),
"-l",
pg_log_path.as_ref(),
"-U",
&username(),
"start",
];
tracing::info!(
"Starting storage controller database with args: {:?}",
db_start_args
);
background_process::start_process(
"storage_controller_db",

View File

@@ -12,3 +12,9 @@ use serde::Deserialize;
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}
/// Request body of the /upgrade API
#[derive(Deserialize, Debug)]
pub struct UpgradeRequest {
pub pg_version: String,
}

View File

@@ -44,10 +44,15 @@ pub enum ComputeStatus {
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute has been prepared, meaning that remote extensions have been
// downloaded and the data directory has been prepared.
Prepared,
// Compute is configured and running.
Running,
// New spec is being applied.
Configuration,
// Compute is upgrading Postgres.
Upgrading,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.

View File

@@ -199,6 +199,11 @@ pub enum ComputeMode {
/// Future versions may want to distinguish between replicas with hot standby
/// feedback and other kinds of replication configurations.
Replica,
/// An upgrade-only node
///
/// This node will not accept remote Postgres connections. It's only
/// purpose is to upgrade a timeline.
Upgrade,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]

View File

@@ -1,8 +1,8 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::Oid;
use postgres_ffi::RepOriginId;
use postgres_ffi::{Oid, TransactionId};
use serde::{Deserialize, Serialize};
use std::{fmt, ops::Range};
@@ -350,17 +350,7 @@ impl Key {
// 02 00000000 00000000 00000000 00 00000000
//
// TwoPhaseFile:
//
// 02 00000000 00000000 00XXXXXX XX XXXXXXXX
//
// \______XID_________/
//
// The 64-bit XID is stored a little awkwardly in field6, field5 and
// field4. PostgreSQL v16 and below only stored a 32-bit XID, which
// fit completely in field6, but starting with PostgreSQL v17, a full
// 64-bit XID is used. Most pageserver code that accesses
// TwoPhaseFiles now deals with 64-bit XIDs even on v16, the high bits
// are just unused.
// 02 00000000 00000000 00000000 00 XID
//
// ControlFile:
// 03 00000000 00000000 00000000 00 00000000
@@ -592,36 +582,35 @@ pub const TWOPHASEDIR_KEY: Key = Key {
};
#[inline(always)]
pub fn twophase_file_key(xid: u64) -> Key {
pub fn twophase_file_key(xid: TransactionId) -> Key {
Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: ((xid & 0xFFFFFF0000000000) >> 40) as u32,
field5: ((xid & 0x000000FF00000000) >> 32) as u8,
field6: (xid & 0x00000000FFFFFFFF) as u32,
field4: 0,
field5: 0,
field6: xid,
}
}
#[inline(always)]
pub fn twophase_key_range(xid: u64) -> Range<Key> {
// 64-bit XIDs really should not overflow
pub fn twophase_key_range(xid: TransactionId) -> Range<Key> {
let (next_xid, overflowed) = xid.overflowing_add(1);
Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: ((xid & 0xFFFFFF0000000000) >> 40) as u32,
field5: ((xid & 0x000000FF00000000) >> 32) as u8,
field6: (xid & 0x00000000FFFFFFFF) as u32,
field4: 0,
field5: 0,
field6: xid,
}..Key {
field1: 0x02,
field2: 0,
field3: u32::from(overflowed),
field4: ((next_xid & 0xFFFFFF0000000000) >> 40) as u32,
field5: ((next_xid & 0x000000FF00000000) >> 32) as u8,
field6: (next_xid & 0x00000000FFFFFFFF) as u32,
field3: 0,
field4: 0,
field5: u8::from(overflowed),
field6: next_xid,
}
}

View File

@@ -56,7 +56,7 @@ fn main() -> anyhow::Result<()> {
PathBuf::from("pg_install")
};
for pg_version in &["v14", "v15", "v16", "v17"] {
for pg_version in &["v14", "v15", "v16"] {
let mut pg_install_dir_versioned = pg_install_dir.join(pg_version);
if pg_install_dir_versioned.is_relative() {
let cwd = env::current_dir().context("Failed to get current_dir")?;

View File

@@ -57,7 +57,6 @@ macro_rules! for_all_postgres_versions {
$macro!(v14);
$macro!(v15);
$macro!(v16);
$macro!(v17);
};
}
@@ -92,7 +91,6 @@ macro_rules! dispatch_pgversion {
14 : v14,
15 : v15,
16 : v16,
17 : v17,
]
)
};
@@ -123,7 +121,6 @@ macro_rules! enum_pgversion_dispatch {
V14 : v14,
V15 : v15,
V16 : v16,
V17 : v17,
]
)
};
@@ -153,7 +150,6 @@ macro_rules! enum_pgversion {
V14 : v14,
V15 : v15,
V16 : v16,
V17 : v17,
]
}
};
@@ -166,7 +162,6 @@ macro_rules! enum_pgversion {
V14 : v14,
V15 : v15,
V16 : v16,
V17 : v17,
]
}
};

View File

@@ -152,9 +152,6 @@ pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8;
pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
// From heapam_xlog.h
pub const XLOG_HEAP2_REWRITE: u8 = 0x00;
// From replication/message.h
pub const XLOG_LOGICAL_MESSAGE: u8 = 0x00;
@@ -170,7 +167,16 @@ pub const RM_RELMAP_ID: u8 = 7;
pub const RM_STANDBY_ID: u8 = 8;
pub const RM_HEAP2_ID: u8 = 9;
pub const RM_HEAP_ID: u8 = 10;
pub const RM_BTREE_ID: u8 = 11;
pub const RM_HASH_ID: u8 = 12;
pub const RM_GIN_ID: u8 = 13;
pub const RM_GIST_ID: u8 = 14;
pub const RM_SEQ_ID: u8 = 15;
pub const RM_SPGIST_ID: u8 = 16;
pub const RM_BRIN_ID: u8 = 17;
pub const RM_COMMIT_TS_ID: u8 = 18;
pub const RM_REPLORIGIN_ID: u8 = 19;
pub const RM_GENERIC_ID: u8 = 20;
pub const RM_LOGICALMSG_ID: u8 = 21;
// from neon_rmgr.h
@@ -184,9 +190,27 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30;
pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40;
pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50;
pub const XLOG_NEON_FILE: u8 = 0x60;
pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40;
#[repr(C)]
#[derive(Debug)]
pub enum XlNeonFileFiletype {
UPGRADE_TARBALL,
}
impl TryFrom<u8> for XlNeonFileFiletype {
type Error = ();
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(XlNeonFileFiletype::UPGRADE_TARBALL),
_ => Err(()),
}
}
}
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
@@ -222,20 +246,15 @@ pub const INVALID_TRANSACTION_ID: u32 = 0;
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* pg_control.h */
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLOG_PARAMETER_CHANGE: u8 = 0x60;
pub const XLOG_END_OF_RECOVERY: u8 = 0x90;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002;
/* From xlog.h */
pub const XLOG_REPLORIGIN_SET: u8 = 0x00;
pub const XLOG_REPLORIGIN_DROP: u8 = 0x10;
/* xlog_internal.h */
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002;
/* From replication/slot.h */
pub const REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN: usize = 4*4 /* offset of `slotdata` in ReplicationSlotOnDisk */
+ 64 /* NameData */ + 4*4;
@@ -253,6 +272,33 @@ pub const VM_HEAPBLOCKS_PER_PAGE: u32 =
/* From origin.c */
pub const REPLICATION_STATE_MAGIC: u32 = 0x1257DADE;
// List of subdirectories inside pgdata.
// Copied from src/bin/initdb/initdb.c
pub const PGDATA_SUBDIRS: [&str; 22] = [
"global",
"pg_wal/archive_status",
"pg_commit_ts",
"pg_dynshmem",
"pg_notify",
"pg_serial",
"pg_snapshots",
"pg_subtrans",
"pg_twophase",
"pg_multixact",
"pg_multixact/members",
"pg_multixact/offsets",
"base",
"base/1",
"pg_replslot",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp",
"pg_xact",
"pg_logical",
"pg_logical/snapshots",
"pg_logical/mappings",
];
// Don't include postgresql.conf as it is inconvenient on node start:
// we need postgresql.conf before basebackup to synchronize safekeepers
// so no point in overwriting it during backup restore. Rest of the files

View File

@@ -5,33 +5,6 @@ pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c */
// List of subdirectories inside pgdata.
// Copied from src/bin/initdb/initdb.c
pub const PGDATA_SUBDIRS: [&str; 22] = [
"global",
"pg_wal/archive_status",
"pg_commit_ts",
"pg_dynshmem",
"pg_notify",
"pg_serial",
"pg_snapshots",
"pg_subtrans",
"pg_twophase",
"pg_multixact",
"pg_multixact/members",
"pg_multixact/offsets",
"base",
"base/1",
"pg_replslot",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp",
"pg_xact",
"pg_logical",
"pg_logical/snapshots",
"pg_logical/mappings",
];
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
}

View File

@@ -11,8 +11,6 @@ pub const BKPIMAGE_COMPRESS_ZSTD: u8 = 0x10; /* page image is compressed */
pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c */
pub use super::super::v14::bindings::PGDATA_SUBDIRS;
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
const ANY_COMPRESS_FLAG: u8 = BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | BKPIMAGE_COMPRESS_ZSTD;

View File

@@ -11,8 +11,6 @@ pub const BKPIMAGE_COMPRESS_ZSTD: u8 = 0x10; /* page image is compressed */
pub const SIZEOF_RELMAPFILE: usize = 524; /* sizeof(RelMapFile) in relmapper.c */
pub use super::super::v14::bindings::PGDATA_SUBDIRS;
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
const ANY_COMPRESS_FLAG: u8 = BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | BKPIMAGE_COMPRESS_ZSTD;

View File

@@ -1,55 +0,0 @@
pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8;
pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00;
pub const XLOG_DBASE_CREATE_WAL_LOG: u8 = 0x10;
pub const XLOG_DBASE_DROP: u8 = 0x20;
pub const BKPIMAGE_APPLY: u8 = 0x02; /* page image should be restored during replay */
pub const BKPIMAGE_COMPRESS_PGLZ: u8 = 0x04; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_LZ4: u8 = 0x08; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_ZSTD: u8 = 0x10; /* page image is compressed */
pub const SIZEOF_RELMAPFILE: usize = 524; /* sizeof(RelMapFile) in relmapper.c */
// List of subdirectories inside pgdata.
// Copied from src/bin/initdb/initdb.c
pub const PGDATA_SUBDIRS: [&str; 23] = [
"global",
"pg_wal/archive_status",
"pg_wal/summaries",
"pg_commit_ts",
"pg_dynshmem",
"pg_notify",
"pg_serial",
"pg_snapshots",
"pg_subtrans",
"pg_twophase",
"pg_multixact",
"pg_multixact/members",
"pg_multixact/offsets",
"base",
"base/1",
"pg_replslot",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp",
"pg_xact",
"pg_logical",
"pg_logical/snapshots",
"pg_logical/mappings",
];
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
const ANY_COMPRESS_FLAG: u8 = BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | BKPIMAGE_COMPRESS_ZSTD;
(bimg_info & ANY_COMPRESS_FLAG) != 0
}
pub const XLOG_HEAP2_PRUNE_ON_ACCESS: u8 = 0x10;
pub const XLOG_HEAP2_PRUNE_VACUUM_SCAN: u8 = 0x20;
pub const XLOG_HEAP2_PRUNE_VACUUM_CLEANUP: u8 = 0x30;
pub const XLOG_OVERWRITE_CONTRECORD: u8 = 0xD0;
pub const XLOG_CHECKPOINT_REDO: u8 = 0xE0;

View File

@@ -53,7 +53,7 @@ impl Conf {
#[allow(clippy::manual_range_patterns)]
match self.pg_version {
14 | 15 | 16 | 17 => Ok(path.join(format!("v{}", self.pg_version))),
14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
_ => bail!("Unsupported postgres version: {}", self.pg_version),
}
}

View File

@@ -5,8 +5,6 @@ use std::{env, path::PathBuf, process::Command};
use anyhow::{anyhow, Context};
const WALPROPOSER_PG_VERSION: &str = "v17";
fn main() -> anyhow::Result<()> {
// Tell cargo to invalidate the built crate whenever the wrapper changes
println!("cargo:rerun-if-changed=bindgen_deps.h");
@@ -38,10 +36,7 @@ fn main() -> anyhow::Result<()> {
// Rebuild crate when libwalproposer.a changes
println!("cargo:rerun-if-changed={walproposer_lib_search_str}/libwalproposer.a");
let pg_config_bin = pg_install_abs
.join(WALPROPOSER_PG_VERSION)
.join("bin")
.join("pg_config");
let pg_config_bin = pg_install_abs.join("v16").join("bin").join("pg_config");
let inc_server_path: String = if pg_config_bin.exists() {
let output = Command::new(pg_config_bin)
.arg("--includedir-server")
@@ -58,7 +53,7 @@ fn main() -> anyhow::Result<()> {
.into()
} else {
let server_path = pg_install_abs
.join(WALPROPOSER_PG_VERSION)
.join("v16")
.join("include")
.join("postgresql")
.join("server")

View File

@@ -79,24 +79,16 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
return None;
}
let keys: Vec<&str> = split[0].split('-').collect();
let lsn_and_opt_generation: Vec<&str> = split[1].split('v').collect();
let lsns: Vec<&str> = lsn_and_opt_generation[0].split('-').collect();
let the_lsns: [&str; 2];
/*
* Generations add a -vX-XXXXXX postfix, which causes issues when we try to
* parse 'vX' as an LSN.
*/
let is_delta = if lsns.len() == 1 || lsns[1].is_empty() {
the_lsns = [lsns[0], lsns[0]];
let mut lsns: Vec<&str> = split[1].split('-').collect();
let is_delta = if lsns.len() == 1 {
lsns.push(lsns[0]);
false
} else {
the_lsns = [lsns[0], lsns[1]];
true
};
let key_range = Key::from_hex(keys[0]).unwrap()..Key::from_hex(keys[1]).unwrap();
let lsn_range = Lsn::from_hex(the_lsns[0]).unwrap()..Lsn::from_hex(the_lsns[1]).unwrap();
let lsn_range = Lsn::from_hex(lsns[0]).unwrap()..Lsn::from_hex(lsns[1]).unwrap();
let holes = Vec::new();
Some(LayerFile {
key_range,

View File

@@ -30,8 +30,9 @@ use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::dispatch_pgversion;
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PG_HBA};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
use postgres_ffi::TransactionId;
use postgres_ffi::XLogFileName;
use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
@@ -254,11 +255,8 @@ where
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
let pgversion = self.timeline.pg_version;
let subdirs = dispatch_pgversion!(pgversion, &pgv::bindings::PGDATA_SUBDIRS[..]);
// Create pgdata subdirs structure
for dir in subdirs.iter() {
for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(dir)?;
self.ar
.append(&header, &mut io::empty())
@@ -608,7 +606,7 @@ where
//
// Extract twophase state files
//
async fn add_twophase_file(&mut self, xid: u64) -> Result<(), BasebackupError> {
async fn add_twophase_file(&mut self, xid: TransactionId) -> Result<(), BasebackupError> {
let img = self
.timeline
.get_twophase_file(xid, self.lsn, self.ctx)
@@ -619,11 +617,7 @@ where
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = if self.timeline.pg_version < 17 {
format!("pg_twophase/{:>08X}", xid)
} else {
format!("pg_twophase/{:>016X}", xid)
};
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar
.append(&header, &buf[..])

View File

@@ -281,7 +281,7 @@ impl PageServerConf {
#[allow(clippy::manual_range_patterns)]
match pg_version {
14 | 15 | 16 | 17 => Ok(path.join(format!("v{pg_version}"))),
14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
}
}

View File

@@ -506,6 +506,10 @@ async fn import_file(
return Ok(None);
}
if file_name == "pg_internal.init" {
return Ok(None);
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
@@ -580,11 +584,9 @@ async fn import_file(
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact members slru");
} else if file_path.starts_with("pg_twophase") {
let bytes = read_all_bytes(reader).await?;
let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
// In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
// it's a 32-bit TransactionId, which fits in u64 anyway.
let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
let bytes = read_all_bytes(reader).await?;
modification
.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
.await?;

View File

@@ -633,7 +633,7 @@ impl Timeline {
pub(crate) async fn get_twophase_file(
&self,
xid: u64,
xid: TransactionId,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
@@ -646,19 +646,11 @@ impl Timeline {
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashSet<u64>, PageReconstructError> {
) -> Result<HashSet<TransactionId>, PageReconstructError> {
// fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
if self.pg_version >= 17 {
Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
} else {
Ok(TwoPhaseDirectory::des(&buf)?
.xids
.iter()
.map(|x| u64::from(*x))
.collect())
}
Ok(TwoPhaseDirectory::des(&buf)?.xids)
}
pub(crate) async fn get_control_file(
@@ -910,13 +902,9 @@ impl Timeline {
// Then pg_twophase
result.add_key(TWOPHASEDIR_KEY);
let mut xids: Vec<u64> = self
.list_twophase_files(lsn, ctx)
.await?
.iter()
.cloned()
.collect();
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
xids.sort_unstable();
for xid in xids {
result.add_key(twophase_file_key(xid));
@@ -1139,15 +1127,9 @@ impl<'a> DatadirModification<'a> {
// Create AuxFilesDirectory
self.init_aux_dir()?;
let buf = if self.tline.pg_version >= 17 {
TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
xids: HashSet::new(),
})
} else {
TwoPhaseDirectory::ser(&TwoPhaseDirectory {
xids: HashSet::new(),
})
}?;
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
xids: HashSet::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, 0));
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
@@ -1339,31 +1321,22 @@ impl<'a> DatadirModification<'a> {
pub async fn put_twophase_file(
&mut self,
xid: u64,
xid: TransactionId,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Add it to the directory entry
let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid = xid as u32;
let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
self.put(
TWOPHASEDIR_KEY,
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
);
self.put(twophase_file_key(xid), Value::Image(img));
Ok(())
@@ -1666,32 +1639,22 @@ impl<'a> DatadirModification<'a> {
/// This method is used for marking truncated SLRU files
pub async fn drop_twophase_file(
&mut self,
xid: u64,
xid: TransactionId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid: u32 = u32::try_from(xid)?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
self.put(
TWOPHASEDIR_KEY,
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
);
// Delete it
self.delete(twophase_key_range(xid));
@@ -2161,21 +2124,11 @@ struct DbDirectory {
dbdirs: HashMap<(Oid, Oid), bool>,
}
// The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
// pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
// were named like "pg_twophase/000002E5", now they're like
// "pg_twophsae/0000000A000002E4".
#[derive(Debug, Serialize, Deserialize)]
struct TwoPhaseDirectory {
xids: HashSet<TransactionId>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TwoPhaseDirectoryV17 {
xids: HashSet<u64>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct RelDirectory {
// Set of relations that exist. (relfilenode, forknum)

View File

@@ -8470,127 +8470,4 @@ mod tests {
Ok(())
}
// Regression test for https://github.com/neondatabase/neon/issues/9012
// Create an image arrangement where we have to read at different LSN ranges
// from a delta layer. This is achieved by overlapping an image layer on top of
// a delta layer. Like so:
//
// A B
// +----------------+ -> delta_layer
// | | ^ lsn
// | =========|-> nested_image_layer |
// | C | |
// +----------------+ |
// ======== -> baseline_image_layer +-------> key
//
//
// When querying the key range [A, B) we need to read at different LSN ranges
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
#[tokio::test]
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let mut expected_key_values = HashMap::new();
let baseline_image_layer_lsn = Lsn(0x10);
let mut baseline_img_layer = Vec::new();
for i in 0..5 {
let key = get_key(i);
let value = format!("value {i}@{baseline_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
baseline_img_layer.push((key, Bytes::from(value)));
}
let nested_image_layer_lsn = Lsn(0x50);
let mut nested_img_layer = Vec::new();
for i in 5..10 {
let key = get_key(i);
let value = format!("value {i}@{nested_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
nested_img_layer.push((key, Bytes::from(value)));
}
let mut delta_layer_spec = Vec::default();
let delta_layer_start_lsn = Lsn(0x20);
let mut delta_layer_end_lsn = delta_layer_start_lsn;
for i in 0..10 {
let key = get_key(i);
let key_in_nested = nested_img_layer
.iter()
.any(|(key_with_img, _)| *key_with_img == key);
let lsn = {
if key_in_nested {
Lsn(nested_image_layer_lsn.0 + 0x10)
} else {
delta_layer_start_lsn
}
};
let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
assert!(
nested_image_layer_lsn > delta_layer_start_lsn
&& nested_image_layer_lsn < delta_layer_end_lsn
);
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
baseline_image_layer_lsn,
DEFAULT_PG_VERSION,
&ctx,
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
delta_layer_start_lsn..delta_layer_end_lsn,
delta_layer_spec,
)], // delta layers
vec![
(baseline_image_layer_lsn, baseline_img_layer),
(nested_image_layer_lsn, nested_img_layer),
], // image layers
delta_layer_end_lsn,
)
.await?;
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let results = tline
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {
let value = res.expect("No key errors");
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
assert_eq!(value, Bytes::from(expected_value));
}
Ok(())
}
}

View File

@@ -279,7 +279,7 @@ pub(crate) enum LayerId {
/// Layer wrapper for the read path. Note that it is valid
/// to use these layers even after external operations have
/// been performed on them (compaction, freeze, etc.).
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) enum ReadableLayer {
PersistentLayer(Layer),
InMemoryLayer(Arc<InMemoryLayer>),
@@ -292,8 +292,6 @@ struct ReadDesc {
layer_id: LayerId,
/// Lsn range for the read, used for selecting the next read
lsn_range: Range<Lsn>,
/// This read's index in [`LayerKeyspace::reads`];
read_id: LayerKeyspaceReadId,
}
/// Data structure which maintains a fringe of layers for the
@@ -312,13 +310,9 @@ pub(crate) struct LayerFringe {
#[derive(Debug)]
struct LayerKeyspace {
layer: ReadableLayer,
next_read_id: LayerKeyspaceReadId,
reads: HashMap<LayerKeyspaceReadId, (Range<Lsn>, KeySpace)>,
target_keyspace: KeySpaceRandomAccum,
}
#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
struct LayerKeyspaceReadId(usize);
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
@@ -333,24 +327,22 @@ impl LayerFringe {
None => return None,
};
let mut entry = match self.layers.entry(read_desc.layer_id) {
Entry::Occupied(o) => o,
Entry::Vacant(_) => unreachable!("fringe internals are always consistent"),
};
let removed = self.layers.remove_entry(&read_desc.layer_id);
let (lsn_range, keyspace) = entry
.get_mut()
.reads
.remove(&read_desc.read_id)
.expect("fringe internals are always consistent");
let layer = entry.get().layer.clone();
if entry.get().reads.is_empty() {
entry.remove();
match removed {
Some((
_,
LayerKeyspace {
layer,
mut target_keyspace,
},
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
None => unreachable!("fringe internals are always consistent"),
}
Some((layer, keyspace, lsn_range))
}
pub(crate) fn update(
@@ -363,31 +355,18 @@ impl LayerFringe {
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
let read_id = {
let r = &mut entry.get_mut().next_read_id;
let read_id = *r;
*r = LayerKeyspaceReadId(r.0 + 1);
read_id
};
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
read_id,
});
let replaced = entry.get_mut().reads.insert(read_id, (lsn_range, keyspace));
assert!(replaced.is_none());
entry.get_mut().target_keyspace.add_keyspace(keyspace);
}
Entry::Vacant(entry) => {
let read_id = LayerKeyspaceReadId(0);
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range: lsn_range.clone(),
lsn_range,
layer_id: layer_id.clone(),
read_id,
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
layer,
next_read_id: LayerKeyspaceReadId(1),
reads: [(read_id, (lsn_range, keyspace))].into(),
target_keyspace: accum,
});
}
}

View File

@@ -14,7 +14,7 @@ mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context, Result};
use arc_swap::ArcSwap;
use bytes::Bytes;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
@@ -4357,6 +4357,10 @@ impl Timeline {
)
}
pub fn get_path(&self) -> Utf8PathBuf {
self.conf.timelines_path(&self.tenant_shard_id)
}
/// Detach this timeline from its ancestor by copying all of ancestors layers as this
/// Timelines layers up to the ancestor_lsn.
///

View File

@@ -24,6 +24,7 @@
use std::time::Duration;
use std::time::SystemTime;
use camino::Utf8PathBuf;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -33,8 +34,10 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use utils::failpoint_support;
use utils::rate_limit::RateLimit;
use utils::zstd::extract_zst_tarball;
use crate::context::RequestContext;
use crate::import_datadir;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::{DatadirModification, Version};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -69,7 +72,9 @@ impl CheckPoint {
}
}
pub struct WalIngest {
pub struct WalIngest<'t> {
timeline: &'t Timeline,
timeline_path: Utf8PathBuf,
shard: ShardIdentity,
checkpoint: CheckPoint,
checkpoint_modified: bool,
@@ -82,12 +87,12 @@ struct WarnIngestLag {
timestamp_invalid_msg_ratelimit: RateLimit,
}
impl WalIngest {
impl<'t> WalIngest<'t> {
pub async fn new(
timeline: &Timeline,
timeline: &'t Timeline,
startpoint: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
) -> anyhow::Result<WalIngest<'t>> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
@@ -100,6 +105,8 @@ impl WalIngest {
});
Ok(WalIngest {
timeline,
timeline_path: timeline.get_path(),
shard: *timeline.get_shard_identity(),
checkpoint,
checkpoint_modified: false,
@@ -237,26 +244,6 @@ impl WalIngest {
.await?;
}
}
} else if pg_version == 17 {
if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
}
}
pg_constants::RM_TBLSPC_ID => {
@@ -266,11 +253,7 @@ impl WalIngest {
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
let pageno = if pg_version < 17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
};
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
@@ -284,7 +267,7 @@ impl WalIngest {
.await?;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf, pg_version);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
.await?;
}
@@ -323,21 +306,12 @@ impl WalIngest {
parsed_xact.xid,
lsn,
);
let xid: u64 = if pg_version >= 17 {
self.adjust_to_full_transaction_id(parsed_xact.xid)?
} else {
parsed_xact.xid as u64
};
modification.drop_twophase_file(xid, ctx).await?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
let xid: u64 = if pg_version >= 17 {
self.adjust_to_full_transaction_id(decoded.xl_xid)?
} else {
decoded.xl_xid as u64
};
modification
.put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx)
.drop_twophase_file(parsed_xact.xid, ctx)
.await?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
modification
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
.await?;
}
}
@@ -345,11 +319,7 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
let pageno = if pg_version < 17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
};
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
@@ -362,11 +332,7 @@ impl WalIngest {
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let pageno = if pg_version < 17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
};
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
@@ -395,20 +361,6 @@ impl WalIngest {
pg_constants::RM_XLOG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_PARAMETER_CHANGE {
if let CheckPoint::V17(cp) = &mut self.checkpoint {
let rec = v17::XlParameterChange::decode(&mut buf);
cp.wal_level = rec.wal_level;
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_END_OF_RECOVERY {
if let CheckPoint::V17(cp) = &mut self.checkpoint {
let rec = v17::XlEndOfRecovery::decode(&mut buf);
cp.wal_level = rec.wal_level;
self.checkpoint_modified = true;
}
}
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
@@ -452,24 +404,12 @@ impl WalIngest {
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let oldest_active_xid = if pg_version >= 17 {
let mut oldest_active_full_xid = cp.nextXid.value;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if xid < oldest_active_full_xid {
oldest_active_full_xid = xid;
}
let mut oldest_active_xid = cp.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
oldest_active_full_xid as u32
} else {
let mut oldest_active_xid = cp.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
let narrow_xid = xid as u32;
if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = narrow_xid;
}
}
oldest_active_xid
};
}
cp.oldestActiveXid = oldest_active_xid;
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
@@ -525,6 +465,17 @@ impl WalIngest {
modification.drop_replorigin(xlrec.node_id).await?
}
}
pg_constants::RM_BTREE_ID
| pg_constants::RM_HASH_ID
| pg_constants::RM_GIN_ID
| pg_constants::RM_GIST_ID
| pg_constants::RM_SEQ_ID
| pg_constants::RM_SPGIST_ID
| pg_constants::RM_BRIN_ID
| pg_constants::RM_COMMIT_TS_ID
| pg_constants::RM_GENERIC_ID => {
// No special handling currently for these resource managers
}
_x => {
// TODO: should probably log & fail here instead of blindly
// doing something without understanding the protocol
@@ -582,25 +533,6 @@ impl WalIngest {
Ok(modification.len() > prev_len)
}
/// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
let next_full_xid =
enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
let next_xid = (next_full_xid) as u32;
let mut epoch = (next_full_xid >> 32) as u32;
if xid > next_xid {
// Wraparound occurred, must be from a prev epoch.
if epoch == 0 {
bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
}
epoch -= 1;
}
Ok((epoch as u64) << 32 | xid as u64)
}
/// Do not store this block, but observe it for the purposes of updating our relation size state.
async fn observe_decoded_block(
&mut self,
@@ -901,73 +833,6 @@ impl WalIngest {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
17 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
if info == pg_constants::XLOG_HEAP_INSERT {
let xlrec = v17::XlHeapInsert::decode(buf);
assert_eq!(0, buf.remaining());
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_DELETE {
let xlrec = v17::XlHeapDelete::decode(buf);
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_UPDATE
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
{
let xlrec = v17::XlHeapUpdate::decode(buf);
// the size of tuple data is inferred from the size of the record.
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v17::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
let xlrec = v17::XlHeapMultiInsert::decode(buf);
let offset_array_len =
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
0
} else {
size_of::<u16>() * xlrec.ntuples as usize
};
assert_eq!(offset_array_len, buf.remaining());
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v17::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
_ => {}
}
@@ -1076,26 +941,53 @@ impl WalIngest {
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match pg_version {
16 | 17 => {
15 => {
let info = decoded.xl_info;
match info {
pg_constants::XLOG_NEON_FILE => {
info!(
"tristan: last_record_lsn={}",
self.timeline.get_last_record_lsn()
);
let xlrec = v16::rm_neon::XlNeonFile::decode(buf);
let pgdata_path = self.timeline_path.join("new-pgdata");
extract_zst_tarball(&pgdata_path, &*xlrec.data).await?;
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?;
info!("LSN from pg_upgraded controlfile: {lsn}");
Box::pin(import_datadir::import_timeline_from_postgres_datadir(
self.timeline,
&pgdata_path,
lsn,
ctx,
))
.await?;
}
_ => return Err(anyhow::anyhow!("Unknown XLOG xl_info field: {}", info)),
}
}
16 => {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
match info {
pg_constants::XLOG_NEON_HEAP_INSERT => {
let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
assert_eq!(0, buf.remaining());
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
}
pg_constants::XLOG_NEON_HEAP_DELETE => {
let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
}
pg_constants::XLOG_NEON_HEAP_UPDATE
| pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
// the size of tuple data is inferred from the size of the record.
// we can't validate the remaining number of bytes without parsing
// the tuple data.
@@ -1111,7 +1003,7 @@ impl WalIngest {
}
}
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
let offset_array_len =
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
@@ -1127,7 +1019,7 @@ impl WalIngest {
}
}
pg_constants::XLOG_NEON_HEAP_LOCK => {
let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;

View File

@@ -174,7 +174,6 @@ impl DecodedWALRecord {
}
15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
17 => info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY,
_ => {
panic!("Unsupported postgres version {pg_version}")
}
@@ -342,47 +341,16 @@ pub mod v14 {
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlParameterChange {
pub max_connections: i32,
pub max_worker_processes: i32,
pub max_wal_senders: i32,
pub max_prepared_xacts: i32,
pub max_locks_per_xact: i32,
pub wal_level: i32,
pub wal_log_hints: bool,
pub track_commit_timestamp: bool,
pub _padding: [u8; 2],
}
impl XlParameterChange {
pub fn decode(buf: &mut Bytes) -> XlParameterChange {
XlParameterChange {
max_connections: buf.get_i32_le(),
max_worker_processes: buf.get_i32_le(),
max_wal_senders: buf.get_i32_le(),
max_prepared_xacts: buf.get_i32_le(),
max_locks_per_xact: buf.get_i32_le(),
wal_level: buf.get_i32_le(),
wal_log_hints: buf.get_u8() != 0,
track_commit_timestamp: buf.get_u8() != 0,
_padding: [buf.get_u8(), buf.get_u8()],
}
}
}
}
pub mod v15 {
pub use super::v14::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
XlParameterChange,
};
}
pub mod v16 {
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
use bytes::{Buf, Bytes};
use postgres_ffi::{OffsetNumber, TransactionId};
@@ -558,35 +526,27 @@ pub mod v16 {
}
}
}
}
}
pub mod v17 {
pub use super::v14::XlHeapLockUpdated;
use bytes::{Buf, Bytes};
pub use postgres_ffi::{TimeLineID, TimestampTz};
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonFile {
pub filetype: u8,
pub size: u32,
pub data: Bytes,
}
pub use super::v16::rm_neon;
pub use super::v16::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
};
impl XlNeonFile {
pub fn decode(buf: &mut Bytes) -> Self {
let filetype = buf.get_u8();
// Skip the padding
buf.advance(std::mem::size_of::<u8>() * 3);
let size = buf.get_u32_le();
#[repr(C)]
#[derive(Debug)]
pub struct XlEndOfRecovery {
pub end_time: TimestampTz,
pub this_time_line_id: TimeLineID,
pub prev_time_line_id: TimeLineID,
pub wal_level: i32,
}
impl XlEndOfRecovery {
pub fn decode(buf: &mut Bytes) -> XlEndOfRecovery {
XlEndOfRecovery {
end_time: buf.get_i64_le(),
this_time_line_id: buf.get_u32_le(),
prev_time_line_id: buf.get_u32_le(),
wal_level: buf.get_i32_le(),
Self {
filetype,
size,
data: buf.copy_to_bytes(buf.remaining()),
}
}
}
}
@@ -809,13 +769,9 @@ pub struct XlClogTruncate {
}
impl XlClogTruncate {
pub fn decode(buf: &mut Bytes, pg_version: u32) -> XlClogTruncate {
pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
XlClogTruncate {
pageno: if pg_version < 17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
},
pageno: buf.get_u32_le(),
oldest_xid: buf.get_u32_le(),
oldest_xid_db: buf.get_u32_le(),
}

View File

@@ -23,7 +23,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
DATA = \
neon--1.0.sql \
neon--1.0--1.1.sql \
neon--1.1--1.2.sql \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \
neon--1.2--1.1.sql \
neon--1.1--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -1,12 +0,0 @@
#ifndef NEON_BITMAP_H
#define NEON_BITMAP_H
/*
* Utilities for manipulating bits8* as bitmaps.
*/
#define BITMAP_ISSET(bm, bit) ((bm)[(bit) >> 3] & (1 << ((bit) & 7)))
#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7))
#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7))
#endif //NEON_BITMAP_H

View File

@@ -27,7 +27,6 @@
#include "pagestore_client.h"
#include "common/hashfn.h"
#include "pgstat.h"
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include RELFILEINFO_HDR
#include "storage/buf_internals.h"
@@ -41,7 +40,6 @@
#include "utils/guc.h"
#include "hll.h"
#include "bitmap.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
@@ -471,99 +469,6 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
return found;
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
*/
int
lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int nblocks, bits8 *bitmap)
{
BufferTag tag;
FileCacheEntry *entry;
uint32 chunk_offs;
int found = 0;
uint32 hash;
int i = 0;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return 0;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
while (true)
{
int this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
{
if ((entry->bitmap[chunk_offs >> 5] &
(1 << (chunk_offs & 31))) != 0)
{
BITMAP_SET(bitmap, i);
found++;
}
}
}
else
{
i += this_chunk;
}
}
else
{
return found;
}
/*
* Break out of the iteration before doing expensive stuff for
* a next iteration
*/
if (i + 1 >= nblocks)
break;
/*
* Prepare for the next iteration. We don't unlock here, as that'd
* probably be more expensive than the gains it'd get us.
*/
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
}
LWLockRelease(lfc_lock);
#if USE_ASSERT_CHECKING
do {
int count = 0;
for (int j = 0; j < nblocks; j++)
{
if (BITMAP_ISSET(bitmap, j))
count++;
}
Assert(count == found);
} while (false);
#endif
return found;
}
/*
* Evict a page (if present) from the local file cache
*/
@@ -643,171 +548,91 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
}
/*
* Try to read pages from local cache.
* Returns the number of pages read from the local cache, and sets bits in
* 'read' for the pages which were read. This may scribble over buffers not
* marked in 'read', so be careful with operation ordering.
*
* In case of error local file cache is disabled (lfc->limit is set to zero),
* and -1 is returned. Note that 'read' and the buffers may be touched and in
* an otherwise invalid state.
*
* If the mask argument is supplied, bits will be set at the offsets of pages
* that were present and read from the LFC.
* Try to read page from local cache.
* Returns true if page is found in local cache.
* In case of error local file cache is disabled (lfc->limit is set to zero).
*/
int
lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void **buffers, BlockNumber nblocks, bits8 *mask)
bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
char *buffer)
{
BufferTag tag;
FileCacheEntry *entry;
ssize_t rc;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
bool result = true;
uint32 hash;
uint64 generation;
uint32 entry_offset;
int blocks_read = 0;
int buf_offset = 0;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return 0;
return false;
if (!lfc_ensure_opened())
return 0;
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
* 3. Read the blocks we're looking for (in one preadv), assuming they exist
* 4. Update the statistics for the read call.
*
* If there is an error, we do an early return.
*/
while (nblocks > 0)
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int iteration_hits = 0;
int iteration_misses = 0;
Assert(blocks_in_chunk > 0);
for (int i = 0; i < blocks_in_chunk; i++)
{
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
}
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* We can return the blocks we've read before LFC got disabled;
* assuming we read any. */
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return blocks_read;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (entry == NULL)
{
/* Pages are not cached */
lfc_ctl->misses += blocks_in_chunk;
pgBufferUsage.file_cache.misses += blocks_in_chunk;
LWLockRelease(lfc_lock);
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
blkno += blocks_in_chunk;
continue;
}
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
LWLockRelease(lfc_lock);
for (int i = 0; i < blocks_in_chunk; i++)
{
/*
* If the page is valid, we consider it "read".
* All other pages will be fetched separately by the next cache
*/
if (entry->bitmap[(chunk_offs + i) / 32] & (1 << ((chunk_offs + i) % 32)))
{
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
}
else
iteration_misses++;
}
Assert(iteration_hits + iteration_misses > 0);
if (iteration_hits != 0)
{
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
if (rc != (BLCKSZ * blocks_in_chunk))
{
lfc_disable("read");
return -1;
}
}
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
lfc_ctl->hits += iteration_hits;
lfc_ctl->misses += iteration_misses;
pgBufferUsage.file_cache.hits += iteration_hits;
pgBufferUsage.file_cache.misses += iteration_misses;
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
else
{
/* generation mismatch, assume error condition */
LWLockRelease(lfc_lock);
return -1;
}
LWLockRelease(lfc_lock);
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
blkno += blocks_in_chunk;
blocks_read += iteration_hits;
return false;
}
return blocks_read;
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
/* Approximate working set */
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{
/* Page is not cached */
lfc_ctl->misses += 1;
pgBufferUsage.file_cache.misses += 1;
LWLockRelease(lfc_lock);
return false;
}
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
LWLockRelease(lfc_lock);
rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
if (rc != BLCKSZ)
{
lfc_disable("read");
return false;
}
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
lfc_ctl->hits += 1;
pgBufferUsage.file_cache.hits += 1;
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
else
result = false;
LWLockRelease(lfc_lock);
return result;
}
/*
@@ -815,17 +640,20 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* If cache is full then evict some other page.
*/
void
lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *const *buffers, BlockNumber nblocks)
#if PG_MAJORVERSION_NUM < 16
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, char *buffer)
#else
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void *buffer)
#endif
{
BufferTag tag;
FileCacheEntry *entry;
ssize_t rc;
bool found;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
uint32 hash;
uint64 generation;
uint32 entry_offset;
int buf_offset = 0;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
@@ -833,142 +661,110 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (!lfc_ensure_opened())
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
CopyNRelFileInfoToBufTag(tag, rinfo);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
* 3. Read the blocks we're looking for (in one preadv), assuming they exist
* 4. Update the statistics for the read call.
*
* If there is an error, we do an early return.
*/
while (nblocks > 0)
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
Assert(blocks_in_chunk > 0);
LWLockRelease(lfc_lock);
return;
}
for (int i = 0; i < blocks_in_chunk; i++)
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (found)
{
/*
* Unlink entry from LRU list to pin it for the duration of IO
* operation
*/
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
}
else
{
/*
* We have two choices if all cache pages are pinned (i.e. used in IO
* operations):
*
* 1) Wait until some of this operation is completed and pages is
* unpinned.
*
* 2) Allocate one more chunk, so that specified cache size is more
* recommendation than hard limit.
*
* As far as probability of such event (that all pages are pinned) is
* considered to be very very small: there are should be very large
* number of concurrent IO operations and them are limited by
* max_connections, we prefer not to complicate code and use second
* approach.
*/
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{
iov[i].iov_base = unconstify(void *, buffers[buf_offset + i]);
iov[i].iov_len = BLCKSZ;
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
CriticalAssert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else if (!dlist_is_empty(&lfc_ctl->holes))
{
/* We can reuse a hole that was left behind when the LFC was shrunk previously */
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool found;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &found);
CriticalAssert(found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
else
{
lfc_ctl->used += 1;
entry->offset = lfc_ctl->size++; /* allocate new chunk at end
* of file */
}
entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
generation = lfc_ctl->generation;
entry_offset = entry->offset;
lfc_ctl->writes += 1;
LWLockRelease(lfc_lock);
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
if (rc != BLCKSZ)
{
lfc_disable("write");
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
if (lfc_ctl->generation == generation)
{
LWLockRelease(lfc_lock);
return;
CriticalAssert(LFC_ENABLED());
/* Place entry to the head of LRU list */
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31));
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (found)
{
/*
* Unlink entry from LRU list to pin it for the duration of IO
* operation
*/
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
}
else
{
/*
* We have two choices if all cache pages are pinned (i.e. used in IO
* operations):
*
* 1) Wait until some of this operation is completed and pages is
* unpinned.
*
* 2) Allocate one more chunk, so that specified cache size is more
* recommendation than hard limit.
*
* As far as probability of such event (that all pages are pinned) is
* considered to be very very small: there are should be very large
* number of concurrent IO operations and them are limited by
* max_connections, we prefer not to complicate code and use second
* approach.
*/
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
CriticalAssert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else if (!dlist_is_empty(&lfc_ctl->holes))
{
/* We can reuse a hole that was left behind when the LFC was shrunk previously */
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &found);
CriticalAssert(found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
else
{
lfc_ctl->used += 1;
entry->offset = lfc_ctl->size++; /* allocate new chunk at end
* of file */
}
entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
generation = lfc_ctl->generation;
entry_offset = entry->offset;
lfc_ctl->writes += blocks_in_chunk;
LWLockRelease(lfc_lock);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
if (rc != BLCKSZ * blocks_in_chunk)
{
lfc_disable("write");
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
/* Place entry to the head of LRU list */
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
for (int i = 0; i < blocks_in_chunk; i++)
{
entry->bitmap[(chunk_offs + i) >> 5] |=
(1 << ((chunk_offs + i) & 31));
}
}
LWLockRelease(lfc_lock);
}
blkno += blocks_in_chunk;
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
}
}

View File

@@ -537,11 +537,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
/* No more polling needed; connection succeeded */
shard->last_connect_time = GetCurrentTimestamp();
#if PG_MAJORVERSION_NUM >= 17
shard->wes_read = CreateWaitEventSet(NULL, 3);
#else
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
#endif
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,

View File

@@ -7,3 +7,7 @@ LANGUAGE C PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor;
CREATE FUNCTION wal_log_file(path text)
RETURNS pg_lsn
AS 'MODULE_PATHNAME', 'wal_log_file'
LANGUAGE C STRICT PARALLEL UNSAFE;

View File

@@ -0,0 +1,8 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.5'" to load this file. \quit
CREATE FUNCTION wal_log_file(path text)
RETURNS pg_lsn
AS 'MODULE_PATHNAME', 'wal_log_file'
LANGUAGE C STRICT PARALLEL UNSAFE;
GRANT EXECUTE ON FUNCTION wal_log_file TO pg_monitor;

View File

@@ -0,0 +1 @@
DROP FUNCTION IF EXISTS wal_log_file(text) CASCADE;

View File

@@ -11,6 +11,8 @@
#include "postgres.h"
#include "fmgr.h"
#include <sys/stat.h>
#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
@@ -29,11 +31,19 @@
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/xloginsert.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/timeout.h"
#include "utils/wait_event.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#if PG_MAJORVERSION_NUM >= 15
#include "access/neon_xlog.h"
#endif
#include "extension_server.h"
#include "neon.h"
@@ -629,10 +639,17 @@ ReportSearchPath(void)
void
_PG_init(void)
{
const char *purpose;
purpose = getenv("NEON_PURPOSE");
if (purpose && strcmp(purpose, "upgrade") == 0)
return;
/*
* Also load 'neon_rmgr'. This makes it unnecessary to list both 'neon'
* and 'neon_rmgr' in shared_preload_libraries.
*/
#if PG_VERSION_NUM >= 160000
load_file("$libdir/neon_rmgr", false);
#endif
@@ -676,6 +693,9 @@ _PG_init(void)
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
#if PG_MAJORVERSION_NUM >= 15
PG_FUNCTION_INFO_V1(wal_log_file);
#endif
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -721,3 +741,200 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
{
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
#if PG_MAJORVERSION_NUM >= 15
Datum
wal_log_file(PG_FUNCTION_ARGS)
{
int rc;
int fd;
ssize_t n;
text *path;
size_t off;
char *data;
short nargs;
struct stat st;
XLogRecPtr lsn;
size_t path_len;
xl_neon_file xlrec;
char file[MAXPGPATH];
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
bool wal_debug;
#endif
path = PG_GETARG_TEXT_PP(0);
path_len = VARSIZE(path) - VARHDRSZ;
memcpy(file, VARDATA(path), path_len);
file[path_len] = '\0';
/* Get the size of the file. Note that stat(2) follows symlinks. */
rc = stat(file, &st);
if (rc != 0)
ereport(ERROR,
(errmsg("failed to get size of file (%s): %m", file)));
xlrec.size = (size_t) st.st_size;
fd = open(file, O_RDONLY);
if (fd == -1)
ereport(ERROR,
(errmsg("could not open %s: %m", file)));
/* If the file is too large, error out. */
data = palloc(xlrec.size);
/* Copy the file contents */
off = 0;
while (true) {
n = read(fd, data + off, xlrec.size - off);
if (n == EOF)
{
close(fd);
ereport(ERROR,
(errmsg("failed to read %s: %m", file)));
}
off += n;
if (xlrec.size - off == 0)
break;
}
close(fd);
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
XLogRegisterData((char *) data, xlrec.size);
/*
* We must turn debugging off on anything where the Neon RMGR is not
* registered. Stash the original value for restoration later.
*/
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
wal_debug = XLOG_DEBUG;
XLOG_DEBUG = false;
#endif
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
XLOG_DEBUG = wal_debug;
#endif
PG_RETURN_LSN(lsn);
}
#endif
#if PG_MAJORVERSION_NUM >= 15
/*
* Entry point for `postgres --wal-log`.
*/
PGDLLEXPORT void
WalLog(int argc, char *argv[])
{
int rc;
int fd;
off_t off;
struct stat st;
XLogRecPtr lsn;
void *data;
xl_neon_file xlrec;
/* TODO: should this be PATH_MAX? should we require an absolute path? */
char file[MAXPGPATH];
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
bool wal_debug;
#endif
if (argc != 3)
ereport(ERROR, errmsg("wrong number of arguments passed to --wal-log"));
if (!realpath(argv[2], file))
ereport(ERROR, errmsg("failed to resolve path: %m"));
ereport(LOG, errmsg("writing %s to WAL", file));
ChangeToDataDir();
CreateDataDirLockFile(false);
LocalProcessControlFile(false);
InitializeMaxBackends();
CreateSharedMemoryAndSemaphores();
InitializeTimeouts();
InitProcess();
BaseInit();
CreateAuxProcessResourceOwner();
StartupXLOG();
/* Get the size of the file. Note that stat(2) follows symlinks. */
rc = stat(file, &st);
if (rc != 0)
ereport(ERROR,
(errmsg("failed to get size of file (%s): %m", file)));
xlrec.size = (size_t) st.st_size;
fd = open(file, O_RDONLY);
if (fd == -1)
ereport(ERROR,
(errmsg("could not open %s: %m", file)));
/* If the file is too large, error out. */
data = palloc(xlrec.size);
/* Copy the file contents */
off = 0;
while (true) {
ssize_t n;
n = read(fd, data + off, xlrec.size - off);
if (n == EOF)
{
close(fd);
ereport(ERROR,
(errmsg("failed to read %s: %m", file)));
}
off += n;
if (xlrec.size - off == 0)
break;
}
close(fd);
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
/* ereport(LOG, errmsg("Current LSN: %X/%X" , LSN_FORMAT_ARGS(GetXLogWriteRecPtr()))); */
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
XLogRegisterData(data, xlrec.size);
/*
* We must turn debugging off on anything where the Neon RMGR is not
* registered. Stash the original value for restoration later.
*/
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
wal_debug = XLOG_DEBUG;
XLOG_DEBUG = false;
#endif
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
XLOG_DEBUG = wal_debug;
#endif
exit(0);
}
#endif

View File

@@ -6,11 +6,7 @@
#ifndef NEON_PGVERSIONCOMPAT_H
#define NEON_PGVERSIONCOMPAT_H
#if PG_MAJORVERSION_NUM < 17
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId)
#else
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != INVALID_PROC_NUMBER)
#endif
#define RelFileInfoEquals(a, b) ( \
NInfoGetSpcOid(a) == NInfoGetSpcOid(b) && \
@@ -54,7 +50,7 @@
#define CopyNRelFileInfoToBufTag(tag, rinfo) \
do { \
(tag).rnode = (rinfo); \
} while (false)
} while (false);
#define BufTagGetNRelFileInfo(tag) tag.rnode
@@ -102,7 +98,7 @@
(tag).spcOid = (rinfo).spcOid; \
(tag).dbOid = (rinfo).dbOid; \
(tag).relNumber = (rinfo).relNumber; \
} while (false)
} while (false);
#define BufTagGetNRelFileInfo(tag) \
((RelFileLocator) { \
@@ -117,10 +113,4 @@
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
#endif
#if PG_MAJORVERSION_NUM < 17
#define ProcNumber BackendId
#define INVALID_PROC_NUMBER InvalidBackendId
#define AmAutoVacuumWorkerProcess() (IsAutoVacuumWorkerProcess())
#endif
#endif /* NEON_PGVERSIONCOMPAT_H */

View File

@@ -6,6 +6,8 @@
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* contrib/neon/pagestore_client.h
*
*-------------------------------------------------------------------------
*/
#ifndef pageserver_h
@@ -185,7 +187,7 @@ extern char *nm_to_string(NeonMessage *msg);
* API
*/
typedef uint16 shardno_t;
typedef unsigned shardno_t;
typedef struct
{
@@ -209,7 +211,7 @@ extern int neon_protocol_version;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);
@@ -231,13 +233,8 @@ extern void neon_zeroextend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nbuffers, bool skipFsync);
#endif
#if PG_MAJORVERSION_NUM >=17
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
#else
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
#endif
/*
* LSN values associated with each request to the pageserver
@@ -272,11 +269,19 @@ typedef struct
} neon_request_lsns;
#if PG_MAJORVERSION_NUM < 16
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, char *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
#else
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
#endif
extern void neon_writeback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
@@ -294,34 +299,17 @@ extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockN
extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);
/* functions for local file cache */
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
#if PG_MAJORVERSION_NUM < 16
extern void lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
char *buffer);
#else
extern void lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer);
#endif
extern bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif

File diff suppressed because it is too large Load Diff

View File

@@ -78,11 +78,16 @@ neon_smgr_shmem_startup(void)
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
if (relsize_hash_size <= 0)
return;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
if (!found)
{
elog(LOG, "neon_relsize: %d", relsize_hash_size);
relsize_lock = (LWLockId) GetNamedLWLockTranche("neon_relsize");
elog(LOG, "neon_relsize");
info.keysize = sizeof(RelTag);
info.entrysize = sizeof(RelSizeEntry);
relsize_hash = ShmemInitHash("neon_relsize",

View File

@@ -81,7 +81,6 @@ static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void);
static void walprop_register_bgworker(void);
@@ -91,7 +90,7 @@ static void walprop_pg_init_bgworker(void);
static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
static void walprop_pg_load_libpqwalreceiver(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback = NULL;
static process_interrupts_callback_t PrevProcessInterruptsCallback;
static shmem_startup_hook_type prev_shmem_startup_hook_type;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
@@ -179,7 +178,7 @@ pg_init_walproposer(void)
nwp_prepare_shmem();
delay_backend_us = &startup_backpressure_wrap;
delay_backend_us = &backpressure_lag_impl;
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
ProcessInterruptsCallback = backpressure_throttling_impl;
@@ -353,22 +352,6 @@ backpressure_lag_impl(void)
return 0;
}
/*
* We don't apply backpressure when we're the postmaster, or the startup
* process, because in postmaster we can't apply backpressure, and in
* the startup process we can't afford to slow down.
*/
static uint64
startup_backpressure_wrap(void)
{
if (AmStartupProcess() || !IsUnderPostmaster)
return 0;
delay_backend_us = &backpressure_lag_impl;
return backpressure_lag_impl();
}
/*
* WalproposerShmemSize --- report amount of shared memory space needed
*/
@@ -418,13 +401,12 @@ WalproposerShmemInit_SyncSafekeeper(void)
static bool
backpressure_throttling_impl(void)
{
uint64 lag;
int64 lag;
TimestampTz start,
stop;
bool retry = false;
if (PointerIsValid(PrevProcessInterruptsCallback))
retry = PrevProcessInterruptsCallback();
bool retry = PrevProcessInterruptsCallback
? PrevProcessInterruptsCallback()
: false;
/*
* Don't throttle read only transactions or wal sender. Do throttle CREATE
@@ -620,12 +602,7 @@ walprop_pg_init_walsender(void)
/* Create replication slot for WAL proposer if not exists */
if (SearchNamedReplicationSlot(WAL_PROPOSER_SLOT_NAME, false) == NULL)
{
#if PG_MAJORVERSION_NUM >= 17
ReplicationSlotCreate(WAL_PROPOSER_SLOT_NAME, false, RS_PERSISTENT,
false, false, false);
#else
ReplicationSlotCreate(WAL_PROPOSER_SLOT_NAME, false, RS_PERSISTENT, false);
#endif
ReplicationSlotReserveWal();
/* Write this slot to disk */
ReplicationSlotMarkDirty();
@@ -1532,11 +1509,7 @@ walprop_pg_init_event_set(WalProposer *wp)
wpg_log(FATAL, "double-initialization of event set");
/* for each sk, we have socket plus potentially socket for neon walreader */
#if PG_MAJORVERSION_NUM >= 17
waitEvents = CreateWaitEventSet(NULL, 2 + 2 * wp->n_safekeepers);
#else
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers);
#endif
AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,

View File

@@ -76,6 +76,8 @@ neon_rm_redo(XLogReaderState *record)
case XLOG_NEON_HEAP_MULTI_INSERT:
redo_neon_heap_multi_insert(record);
break;
case XLOG_NEON_FILE:
break;
default:
elog(PANIC, "neon_rm_redo: unknown op code %u", info);
}

View File

@@ -1,7 +1,6 @@
#include "postgres.h"
#if PG_MAJORVERSION_NUM >= 16
#include "access/heapam_xlog.h"
#include "access/neon_xlog.h"
#include "replication/decode.h"
@@ -10,10 +9,6 @@
#include "neon_rmgr.h"
#endif /* PG >= 16 */
#if PG_MAJORVERSION_NUM == 16
/* individual record(group)'s handlers */
static void DecodeNeonInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeNeonUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -62,6 +57,8 @@ neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonMultiInsert(ctx, buf);
break;
case XLOG_NEON_FILE:
break;
default:
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
break;
@@ -404,398 +401,6 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
header->t_infomask2 = xlhdr.t_infomask2;
header->t_hoff = xlhdr.t_hoff;
}
#endif
#if PG_MAJORVERSION_NUM == 17
/* individual record(group)'s handlers */
static void DecodeNeonInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeNeonUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeNeonDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeNeonMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
void
neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_NEON_OPMASK;
TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
* point in decoding data changes.
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
ctx->fast_forward)
return;
switch (info)
{
case XLOG_NEON_HEAP_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonInsert(ctx, buf);
break;
case XLOG_NEON_HEAP_DELETE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonDelete(ctx, buf);
break;
case XLOG_NEON_HEAP_UPDATE:
case XLOG_NEON_HEAP_HOT_UPDATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonUpdate(ctx, buf);
break;
case XLOG_NEON_HEAP_LOCK:
break;
case XLOG_NEON_HEAP_MULTI_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonMultiInsert(ctx, buf);
break;
default:
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
break;
}
}
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
if (ctx->callbacks.filter_by_origin_cb == NULL)
return false;
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
/*
* Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
*
* Deletes can contain the new tuple.
*/
static void
DecodeNeonInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
Size datalen;
char *tupledata;
Size tuplelen;
XLogReaderState *r = buf->record;
xl_neon_heap_insert *xlrec;
ReorderBufferChange *change;
RelFileLocator target_locator;
xlrec = (xl_neon_heap_insert *) XLogRecGetData(r);
/*
* Ignore insert records without new tuples (this does happen when
* raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
*/
if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
return;
/* only interested in our database */
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
if (target_locator.dbOid != ctx->slot->data.database)
return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder);
if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
change->action = REORDER_BUFFER_CHANGE_INSERT;
else
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
tupledata = XLogRecGetBlockData(r, 0, &datalen);
tuplelen = datalen - SizeOfHeapHeader;
change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change,
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
}
/*
* Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
*
* Deletes can possibly contain the old primary key.
*/
static void
DecodeNeonDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
xl_neon_heap_delete *xlrec;
ReorderBufferChange *change;
RelFileLocator target_locator;
xlrec = (xl_neon_heap_delete *) XLogRecGetData(r);
/* only interested in our database */
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
if (target_locator.dbOid != ctx->slot->data.database)
return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder);
if (xlrec->flags & XLH_DELETE_IS_SUPER)
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
else
change->action = REORDER_BUFFER_CHANGE_DELETE;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
/* old primary key stored */
if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
{
Size datalen = XLogRecGetDataLen(r) - SizeOfNeonHeapHeader;
Size tuplelen = datalen - SizeOfNeonHeapHeader;
Assert(XLogRecGetDataLen(r) > (SizeOfNeonHeapDelete + SizeOfNeonHeapHeader));
change->data.tp.oldtuple =
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple((char *) xlrec + SizeOfNeonHeapDelete,
datalen, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
/*
* Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
* in the record, from wal into proper tuplebufs.
*
* Updates can possibly contain a new tuple and the old primary key.
*/
static void
DecodeNeonUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
xl_neon_heap_update *xlrec;
ReorderBufferChange *change;
char *data;
RelFileLocator target_locator;
xlrec = (xl_neon_heap_update *) XLogRecGetData(r);
/* only interested in our database */
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
if (target_locator.dbOid != ctx->slot->data.database)
return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
{
Size datalen;
Size tuplelen;
data = XLogRecGetBlockData(r, 0, &datalen);
tuplelen = datalen - SizeOfNeonHeapHeader;
change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
{
Size datalen;
Size tuplelen;
/* caution, remaining data in record is not aligned */
data = XLogRecGetData(r) + SizeOfNeonHeapUpdate;
datalen = XLogRecGetDataLen(r) - SizeOfNeonHeapUpdate;
tuplelen = datalen - SizeOfNeonHeapHeader;
change->data.tp.oldtuple =
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
/*
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
*
* Currently MULTI_INSERT will always contain the full tuples.
*/
static void
DecodeNeonMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
xl_neon_heap_multi_insert *xlrec;
int i;
char *data;
char *tupledata;
Size tuplelen;
RelFileLocator rlocator;
xlrec = (xl_neon_heap_multi_insert *) XLogRecGetData(r);
/*
* Ignore insert records without new tuples. This happens when a
* multi_insert is done on a catalog or on a non-persistent relation.
*/
if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
return;
/* only interested in our database */
XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
if (rlocator.dbOid != ctx->slot->data.database)
return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
/*
* We know that this multi_insert isn't for a catalog, so the block should
* always have data even if a full-page write of it is taken.
*/
tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
Assert(tupledata != NULL);
data = tupledata;
for (i = 0; i < xlrec->ntuples; i++)
{
ReorderBufferChange *change;
xl_neon_multi_insert_tuple *xlhdr;
int datalen;
HeapTuple tuple;
HeapTupleHeader header;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
xlhdr = (xl_neon_multi_insert_tuple *) SHORTALIGN(data);
data = ((char *) xlhdr) + SizeOfNeonMultiInsertTuple;
datalen = xlhdr->datalen;
change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
header = tuple->t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->t_self);
/*
* We can only figure this out after reassembling the transactions.
*/
tuple->t_tableOid = InvalidOid;
tuple->t_len = datalen + SizeofHeapTupleHeader;
memset(header, 0, SizeofHeapTupleHeader);
memcpy((char *) tuple->t_data + SizeofHeapTupleHeader,
(char *) data,
datalen);
header->t_infomask = xlhdr->t_infomask;
header->t_infomask2 = xlhdr->t_infomask2;
header->t_hoff = xlhdr->t_hoff;
/*
* Reset toast reassembly state only after the last row in the last
* xl_multi_insert_tuple record emitted by one heap_multi_insert()
* call.
*/
if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
(i + 1) == xlrec->ntuples)
change->data.tp.clear_toast_afterwards = true;
else
change->data.tp.clear_toast_afterwards = false;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
buf->origptr, change, false);
/* move to the next xl_neon_multi_insert_tuple entry */
data += datalen;
}
Assert(data == tupledata + tuplelen);
}
/*
* Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
* (but not by heap_multi_insert) into a tuplebuf.
*
* The size 'len' and the pointer 'data' in the record need to be
* computed outside as they are record specific.
*/
static void
DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
{
xl_neon_heap_header xlhdr;
int datalen = len - SizeOfNeonHeapHeader;
HeapTupleHeader header;
Assert(datalen >= 0);
tuple->t_len = datalen + SizeofHeapTupleHeader;
header = tuple->t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->t_self);
/* we can only figure this out after reassembling the transactions */
tuple->t_tableOid = InvalidOid;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
data,
SizeOfNeonHeapHeader);
memset(header, 0, SizeofHeapTupleHeader);
memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
data + SizeOfNeonHeapHeader,
datalen);
header->t_infomask = xlhdr.t_infomask;
header->t_infomask2 = xlhdr.t_infomask2;
header->t_hoff = xlhdr.t_hoff;
}
#endif

View File

@@ -134,6 +134,16 @@ neon_rm_desc(StringInfo buf, XLogReaderState *record)
xlrec->ntuples, &offset_elem_desc, NULL);
}
}
else if (info == XLOG_NEON_FILE)
{
const xl_neon_file *xlrec = (xl_neon_file *) rec;
switch ((xl_neon_file_filetype) xlrec->filetype)
{
case XL_NEON_FILE_UPGRADE_TARBALL:
appendStringInfo(buf, "filetype: upgrade tarball, size: %zu", xlrec->size);
break;
}
}
}
const char *
@@ -173,6 +183,9 @@ neon_rm_identify(uint8 info)
case XLOG_NEON_HEAP_MULTI_INSERT | XLOG_NEON_INIT_PAGE:
id = "MULTI_INSERT+INIT";
break;
case XLOG_NEON_FILE:
id = "FILE";
break;
}
return id;

View File

@@ -68,13 +68,8 @@ static void inmem_close(SMgrRelation reln, ForkNumber forknum);
static void inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo);
static bool inmem_exists(SMgrRelation reln, ForkNumber forknum);
static void inmem_unlink(NRelFileInfoBackend rinfo, ForkNumber forknum, bool isRedo);
#if PG_MAJORVERSION_NUM >= 17
static bool inmem_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
#else
static bool inmem_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
#endif
#if PG_MAJORVERSION_NUM < 16
static void inmem_extend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
@@ -98,9 +93,7 @@ static BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
static void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);
static void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
#if PG_MAJORVERSION_NUM >= 17
static void inmem_registersync(SMgrRelation reln, ForkNumber forknum);
#endif
/*
* inmem_init() -- Initialize private state
@@ -197,14 +190,6 @@ inmem_close(SMgrRelation reln, ForkNumber forknum)
{
}
#if PG_MAJORVERSION_NUM >= 17
static bool
inmem_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
int nblocks)
{
return true;
}
#else
/*
* inmem_prefetch() -- Initiate asynchronous read of the specified block of a relation
*/
@@ -213,7 +198,6 @@ inmem_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
{
return true;
}
#endif
/*
* inmem_writeback() -- Tell the kernel to write pages back to storage.
@@ -227,13 +211,11 @@ inmem_writeback(SMgrRelation reln, ForkNumber forknum,
/*
* inmem_read() -- Read the specified block from a relation.
*/
#if PG_MAJORVERSION_NUM < 16
static void
inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
#if PG_MAJORVERSION_NUM < 16
char *buffer)
#else
static void
inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
void *buffer)
#endif
{
@@ -246,18 +228,6 @@ inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
memcpy(buffer, page_body[pg], BLCKSZ);
}
#if PG_MAJORVERSION_NUM >= 17
static void
inmem_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
void **buffers, BlockNumber nblocks)
{
for (int i = 0; i < nblocks; i++)
{
inmem_read(reln, forknum, blkno, buffers[i]);
}
}
#endif
/*
* inmem_write() -- Write the supplied block at the appropriate location.
*
@@ -310,18 +280,6 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
memcpy(page_body[pg], buffer, BLCKSZ);
}
#if PG_MAJORVERSION_NUM >= 17
static void
inmem_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
const void **buffers, BlockNumber nblocks, bool skipFsync)
{
for (int i = 0; i < nblocks; i++)
{
inmem_write(reln, forknum, blkno, buffers[i], skipFsync);
}
}
#endif
/*
* inmem_nblocks() -- Get the number of blocks stored in a relation.
*/
@@ -357,13 +315,6 @@ inmem_immedsync(SMgrRelation reln, ForkNumber forknum)
{
}
#if PG_MAJORVERSION_NUM >= 17
static void
inmem_registersync(SMgrRelation reln, ForkNumber forknum)
{
}
#endif
static const struct f_smgr inmem_smgr =
{
.smgr_init = inmem_init,
@@ -377,39 +328,23 @@ static const struct f_smgr inmem_smgr =
#if PG_MAJORVERSION_NUM >= 16
.smgr_zeroextend = inmem_zeroextend,
#endif
#if PG_MAJORVERSION_NUM >= 17
.smgr_prefetch = inmem_prefetch,
.smgr_readv = inmem_readv,
.smgr_writev = inmem_writev,
#else
.smgr_prefetch = inmem_prefetch,
.smgr_read = inmem_read,
.smgr_write = inmem_write,
#endif
.smgr_writeback = inmem_writeback,
.smgr_nblocks = inmem_nblocks,
.smgr_truncate = inmem_truncate,
.smgr_immedsync = inmem_immedsync,
#if PG_MAJORVERSION_NUM >= 17
.smgr_registersync = inmem_registersync,
#endif
.smgr_start_unlogged_build = NULL,
.smgr_finish_unlogged_build_phase_1 = NULL,
.smgr_end_unlogged_build = NULL,
.smgr_read_slru_segment = NULL,
};
const f_smgr *
smgr_inmem(ProcNumber backend, NRelFileInfo rinfo)
smgr_inmem(BackendId backend, NRelFileInfo rinfo)
{
Assert(InRecovery);
// // What does this code do?
// if (backend != INVALID_PROC_NUMBER)
// return smgr_standard(backend, rinfo);
// else
return &inmem_smgr;
if (backend != InvalidBackendId)
return smgr_standard(backend, rinfo);
else
return &inmem_smgr;
}
void

View File

@@ -11,7 +11,7 @@
#ifndef INMEM_SMGR_H
#define INMEM_SMGR_H
extern const f_smgr *smgr_inmem(ProcNumber backend, NRelFileInfo rinfo);
extern const f_smgr *smgr_inmem(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_inmem(void);
#endif /* INMEM_SMGR_H */

View File

@@ -100,9 +100,6 @@
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#if PG_MAJORVERSION_NUM >= 17
#include "storage/dsm_registry.h"
#endif
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
@@ -140,7 +137,7 @@ static BufferTag target_redo_tag;
static XLogReaderState *reader_state;
#define TRACE LOG
#define TRACE DEBUG5
#ifdef HAVE_LIBSECCOMP
@@ -520,10 +517,6 @@ CreateFakeSharedMemoryAndSemaphores()
/*
* Set up xlog, clog, and buffers
*/
#if PG_MAJORVERSION_NUM >= 17
DSMRegistryShmemInit();
VarsupShmemInit();
#endif
XLOGShmemInit();
CLOGShmemInit();
CommitTsShmemInit();
@@ -573,10 +566,7 @@ CreateFakeSharedMemoryAndSemaphores()
/*
* Set up other modules that need some shared memory space
*/
#if PG_MAJORVERSION_NUM < 17
/* "snapshot too old" was removed in PG17, and with it the SnapMgr */
SnapMgrInit();
#endif
BTreeShmemInit();
SyncScanShmemInit();
/* Skip due to the 'pg_notify' directory check */
@@ -752,7 +742,7 @@ BeginRedoForBlock(StringInfo input_message)
target_redo_tag.forkNum,
target_redo_tag.blockNum);
reln = smgropen(rinfo, INVALID_PROC_NUMBER, RELPERSISTENCE_PERMANENT);
reln = smgropen(rinfo, InvalidBackendId, RELPERSISTENCE_PERMANENT);
if (reln->smgr_cached_nblocks[forknum] == InvalidBlockNumber ||
reln->smgr_cached_nblocks[forknum] < blknum + 1)
{

View File

@@ -311,9 +311,7 @@ async fn auth_quirks(
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
@@ -605,7 +603,6 @@ mod tests {
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {

View File

@@ -538,17 +538,4 @@ mod tests {
));
Ok(())
}
#[test]
fn test_connection_blocker() {
fn check(v: serde_json::Value) -> bool {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
check_peer_addr_is_in_list(&peer_addr, &ip_list)
}
assert!(check(json!([])));
assert!(check(json!(["127.0.0.1"])));
assert!(!check(json!(["255.255.255.255"])));
}
}

View File

@@ -224,7 +224,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
rate_limiter_enabled: false,
rate_limiter: BucketRateLimiter::new(vec![]),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
},
require_client_ip: false,
handshake_timeout: Duration::from_secs(10),

View File

@@ -224,10 +224,6 @@ struct ProxyCliArgs {
/// Whether to retry the wake_compute request
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
wake_compute_retry: String,
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_private_access_proxy: bool,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -686,7 +682,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
ip_allowlist_check_enabled: !args.is_private_access_proxy,
};
let config = Box::leak(Box::new(ProxyConfig {

View File

@@ -64,7 +64,6 @@ pub struct AuthenticationConfig {
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
pub rate_limit_ip_subnet: u8,
pub ip_allowlist_check_enabled: bool,
}
impl TlsConfig {

View File

@@ -50,9 +50,7 @@ impl PoolingBackend {
.as_ref()
.map(|()| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
if !self

View File

@@ -580,15 +580,15 @@ where
* because safekeepers parse WAL headers and the format
* may change between versions.
*/
if msg.pg_version / 10000 != self.state.server.pg_version / 10000
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
bail!(
"incompatible server version {}, expected {}",
msg.pg_version,
self.state.server.pg_version
);
}
// if msg.pg_version / 10000 != self.state.server.pg_version / 10000
// && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
// {
// bail!(
// "incompatible server version {}, expected {}",
// msg.pg_version,
// self.state.server.pg_version
// );
// }
if msg.tenant_id != self.state.tenant_id {
bail!(

12
test.sh Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/sh
cargo neon endpoint stop ep-main 2>/dev/null
kill $(pgrep compute_ctl)
cargo neon stop 2>/dev/null
rm -rf .neon
cargo neon init
cargo neon start
cargo neon tenant create --pg-version 15 --set-default
cargo neon endpoint create --pg-version 15 --upgrade-only
cargo neon endpoint start ep-main
curl -i -X POST http://localhost:55433/upgrade -d '{"pg_version": "16"}'

View File

@@ -13,7 +13,7 @@ DEFAULT_WAL_SEG_SIZE = 16 * 1024 * 1024
class Lsn:
"""
Datatype for an LSN. Internally it is a 64-bit integer, but the string
representation is like "1/0123abcd". See also pg_lsn datatype in Postgres
representation is like "1/123abcd". See also pg_lsn datatype in Postgres
"""
def __init__(self, x: Union[int, str]):

View File

@@ -933,11 +933,8 @@ class NeonEnvBuilder:
for directory_to_clean in reversed(directories_to_clean):
if not os.listdir(directory_to_clean):
log.info(f"Removing empty directory {directory_to_clean}")
try:
directory_to_clean.rmdir()
except Exception as e:
log.error(f"Error removing empty directory {directory_to_clean}: {e}")
log.debug(f"Removing empty directory {directory_to_clean}")
directory_to_clean.rmdir()
def cleanup_remote_storage(self):
for x in [self.pageserver_remote_storage, self.safekeepers_remote_storage]:
@@ -3426,7 +3423,6 @@ class VanillaPostgres(PgProtocol):
assert not self.running
with open(os.path.join(self.pgdatadir, "postgresql.conf"), "a") as conf_file:
conf_file.write("\n".join(options))
conf_file.write("\n")
def edit_hba(self, hba: List[str]):
"""Prepend hba lines into pg_hba.conf file."""
@@ -3480,7 +3476,6 @@ def vanilla_pg(
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
yield vanilla_pg

View File

@@ -16,7 +16,6 @@ class PgVersion(str, enum.Enum):
V14 = "14"
V15 = "15"
V16 = "16"
V17 = "17"
# Instead of making version an optional parameter in methods, we can use this fake entry
# to explicitly rely on the default server version (could be different from pg_version fixture value)
NOT_SET = "<-POSTRGRES VERSION IS NOT SET->"

View File

@@ -1,7 +0,0 @@
{
"public_extensions": [],
"library_index": {
"TODO": "We still need PG17 extensions"
},
"extension_data": {}
}

View File

@@ -21,7 +21,7 @@ from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
)
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
from fixtures.workload import Workload
@@ -156,9 +156,6 @@ ingest_lag_log_line = ".*ingesting record with timestamp lagging more than wait_
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@skip_on_postgres(
PgVersion.V17, "There are no snapshots yet"
) # TODO: revert this once we have snapshots
def test_backward_compatibility(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
@@ -206,9 +203,6 @@ def test_backward_compatibility(
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@skip_on_postgres(
PgVersion.V17, "There are no snapshots yet"
) # TODO: revert this once we have snapshots
def test_forward_compatibility(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,

View File

@@ -44,8 +44,6 @@ def test_remote_extensions(
):
if pg_version == PgVersion.V16:
pytest.skip("TODO: PG16 extension building")
if pg_version == PgVersion.V17:
pytest.skip("TODO: PG17 extension building")
# setup mock http server
# that expects request for anon.tar.zst

View File

@@ -20,19 +20,16 @@ def test_postgres_version(base_dir: Path, pg_bin: PgBin, pg_version: PgVersion):
output = f.read().strip()
# `postgres --version` prints something like "postgres (PostgreSQL) 15.6 (85d809c124a898847a97d66a211f7d5ef4f8e0cb)".
# beta- and release candidate releases would use '17beta1' and '18rc2' instead of .-separated numbers.
pattern = (
r"postgres \(PostgreSQL\) (?P<version>\d+(?:beta|rc|\.)\d+) \((?P<commit>[0-9a-f]{40})\)"
)
pattern = r"postgres \(PostgreSQL\) (?P<version>\d+\.\d+) \((?P<commit>[0-9a-f]{40})\)"
match = re.search(pattern, output, re.IGNORECASE)
assert match is not None, f"Can't parse {output} with {pattern}"
version = match.group("version")
commit = match.group("commit")
if "." in version:
assert (
pg_version.v_prefixed in expected_revisions
), f"Released PostgreSQL version `{pg_version.v_prefixed}` doesn't exist in `vendor/revisions.json`, please update it if these changes are intentional"
msg = f"Unexpected Postgres {pg_version} version: `{output}`, please update `vendor/revisions.json` if these changes are intentional"
assert [version, commit] == expected_revisions[pg_version.v_prefixed], msg
assert (
pg_version.v_prefixed in expected_revisions
), f"Version `{pg_version.v_prefixed}` doesn't exist in `vendor/revisions.json`, please update it if these changes are intentional"
msg = f"Unexpected Postgres {pg_version} version: `{output}`, please update `vendor/revisions.json` if these changes are intentional"
assert [version, commit] == expected_revisions[pg_version.v_prefixed], msg

View File

@@ -118,9 +118,6 @@ def test_ancestor_detach_branched_from(
truncated_layers = 0
elif branchpoint == Branchpoint.AFTER_L0:
branch_at = Lsn(last_lsn + 8)
# make sure the branch point is not on a page header
if 0 < (branch_at.lsn_int % 8192) < 40:
branch_at += 40
rows = 8192
# as there is no 8 byte walrecord, nothing should get copied from the straddling layer
truncated_layers = 0

View File

@@ -1,32 +1,19 @@
import os
from pathlib import Path
from fixtures.common_types import TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
PgBin,
fork_at_current_lsn,
import_timeline_from_vanilla_postgres,
)
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
#
# Test branching, when a transaction is in prepared state
#
def twophase_test_on_timeline(env: NeonEnv):
endpoint = env.endpoints.create_start(
"test_twophase", config_lines=["max_prepared_transactions=5"]
)
def test_twophase(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=5"])
conn = endpoint.connect()
cur = conn.cursor()
# FIXME: Switch to the next WAL segment, to work around the bug fixed in
# https://github.com/neondatabase/neon/pull/8914. When that is merged, this can be
# removed.
cur.execute("select pg_switch_wal()")
cur.execute("CREATE TABLE foo (t text)")
# Prepare a transaction that will insert a row
@@ -66,7 +53,7 @@ def twophase_test_on_timeline(env: NeonEnv):
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "test_twophase")
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "main")
# Start compute on the new branch
endpoint2 = env.endpoints.create_start(
@@ -93,50 +80,3 @@ def twophase_test_on_timeline(env: NeonEnv):
# Only one committed insert is visible on the original branch
cur.execute("SELECT * FROM foo")
assert cur.fetchall() == [("three",)]
def test_twophase(neon_simple_env: NeonEnv):
"""
Test branching, when a transaction is in prepared state
"""
env = neon_simple_env
env.neon_cli.create_branch("test_twophase")
twophase_test_on_timeline(env)
def test_twophase_nonzero_epoch(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin: PgBin,
vanilla_pg,
):
"""
Same as 'test_twophase' test, but with a non-zero XID epoch, i.e. after 4 billion XIDs
have been consumed. (This is to ensure that we correctly use the full 64-bit XIDs in
pg_twophase filenames with PostgreSQL v17.)
"""
env = neon_simple_env
# Reset the vanilla Postgres instance with a higher XID epoch
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
cmd = [pg_resetwal_path, "--epoch=1000000000", "-D", str(vanilla_pg.pgdatadir)]
pg_bin.run_capture(cmd)
timeline_id = TimelineId.generate()
# Import the cluster to Neon
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
import_timeline_from_vanilla_postgres(
test_output_dir,
env,
pg_bin,
env.initial_tenant,
timeline_id,
"test_twophase",
vanilla_pg.connstr(),
)
vanilla_pg.stop() # don't need the original server anymore
twophase_test_on_timeline(env)

View File

@@ -0,0 +1,54 @@
from time import sleep
import pytest
import requests
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion
def test_upgrade(pg_version: PgVersion, neon_simple_env: NeonEnv):
env = neon_simple_env
upgrade_to: PgVersion
if pg_version == PgVersion.V14:
upgrade_to = PgVersion.V15
elif pg_version == PgVersion.V15:
upgrade_to = PgVersion.V16
else:
pytest.skip("Nothing to upgrade")
env.neon_cli.create_timeline("test_upgrade")
endpoint = env.endpoints.create_start("test_upgrade")
resp = requests.post(
f"http://localhost:{endpoint.http_port}/upgrade",
json={
"pg_version": upgrade_to,
},
)
assert resp.status_code == 202
while True:
resp = requests.get(f"http://localhost:{endpoint.http_port}/status")
assert resp.status_code == 200
data = resp.json()
if data["status"] == "upgrading":
sleep(1)
continue
elif data["status"] == "running":
break
else:
pytest.fail(f"Unexpected compute state during upgrade: {data['status']}")
endpoint.stop_and_destroy()
def test_upgrade_bad_request(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_timeline("test_upgrade_bad_request")
endpoint = env.endpoints.create_start("test_upgrade_bad_request")
resp = requests.post(f"http://localhost:{endpoint.http_port}/upgrade")
assert resp.status_code == 400
# TODO: Use postgres versions that are out of range.

1
vendor/postgres-v17 vendored

Submodule vendor/postgres-v17 deleted from 9156d63ce2