mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-29 08:10:38 +00:00
Compare commits
29 Commits
synthetic_
...
write_mess
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51f672b0bb | ||
|
|
1360361f60 | ||
|
|
000eb1b069 | ||
|
|
f51b48fa49 | ||
|
|
9f906ff236 | ||
|
|
c79dd8d458 | ||
|
|
ec4ecdd543 | ||
|
|
20a4d817ce | ||
|
|
5ebf7e5619 | ||
|
|
0692fffbf3 | ||
|
|
093570af20 | ||
|
|
eb403da814 | ||
|
|
f3ad635911 | ||
|
|
a8d7360881 | ||
|
|
b0311cfdeb | ||
|
|
412e0aa985 | ||
|
|
965b4f4ae2 | ||
|
|
95018672fa | ||
|
|
2caece2077 | ||
|
|
b8b8c19fb4 | ||
|
|
225add041f | ||
|
|
5d001b1e5a | ||
|
|
fe462de85b | ||
|
|
c0de7f5cd8 | ||
|
|
b220ba6cd1 | ||
|
|
7de373210d | ||
|
|
5c5b03ce08 | ||
|
|
d7d3f451f0 | ||
|
|
bc7d3c6476 |
@@ -32,8 +32,6 @@ storage:
|
||||
hosts:
|
||||
safekeeper-0.ap-southeast-1.aws.neon.tech:
|
||||
ansible_host: i-0d6f1dc5161eef894
|
||||
safekeeper-1.ap-southeast-1.aws.neon.tech:
|
||||
ansible_host: i-0e338adda8eb2d19f
|
||||
safekeeper-2.ap-southeast-1.aws.neon.tech:
|
||||
ansible_host: i-04fb63634e4679eb9
|
||||
safekeeper-3.ap-southeast-1.aws.neon.tech:
|
||||
|
||||
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
@@ -27,6 +27,8 @@ storage:
|
||||
ansible_host: i-062227ba7f119eb8c
|
||||
pageserver-1.us-east-2.aws.neon.tech:
|
||||
ansible_host: i-0b3ec0afab5968938
|
||||
pageserver-2.us-east-2.aws.neon.tech:
|
||||
ansible_host: i-0d7a1c4325e71421d
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
|
||||
@@ -1,6 +1,21 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,21 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
2
.github/workflows/deploy-dev.yml
vendored
2
.github/workflows/deploy-dev.yml
vendored
@@ -67,7 +67,7 @@ jobs:
|
||||
./get_binaries.sh
|
||||
|
||||
ansible-galaxy collection install sivel.toiletwater
|
||||
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
- name: Cleanup ansible folder
|
||||
|
||||
2
.github/workflows/deploy-prod.yml
vendored
2
.github/workflows/deploy-prod.yml
vendored
@@ -68,7 +68,7 @@ jobs:
|
||||
./get_binaries.sh
|
||||
|
||||
ansible-galaxy collection install sivel.toiletwater
|
||||
ansible-playbook deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
deploy-proxy-prod-new:
|
||||
|
||||
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -854,6 +854,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"postgres",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
@@ -3054,6 +3055,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
@@ -3065,15 +3067,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remove_dir_all"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
version = "0.11.14"
|
||||
@@ -3847,16 +3840,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.3.0"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
|
||||
checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fastrand",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"remove_dir_all",
|
||||
"winapi",
|
||||
"rustix",
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -150,7 +150,7 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
criterion = "0.4"
|
||||
rcgen = "0.10"
|
||||
rstest = "0.16"
|
||||
tempfile = "3.2"
|
||||
tempfile = "3.4"
|
||||
tonic-build = "0.8"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
ARG PG_VERSION
|
||||
ARG REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
ARG IMAGE=rust
|
||||
ARG TAG=pinned
|
||||
@@ -11,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
|
||||
RUN apt update && \
|
||||
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
|
||||
libicu-dev
|
||||
libicu-dev libxslt1-dev
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -23,18 +24,24 @@ FROM build-deps AS pg-build
|
||||
ARG PG_VERSION
|
||||
COPY vendor/postgres-${PG_VERSION} postgres
|
||||
RUN cd postgres && \
|
||||
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu && \
|
||||
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
|
||||
--with-libxml --with-libxslt && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
|
||||
# Install headers
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
|
||||
# Enable some of contrib extensions
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -57,10 +64,11 @@ RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar
|
||||
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make clean && cp -R /sfcgal/* /
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin:$PATH"
|
||||
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postgis.tar.gz && \
|
||||
mkdir postgis-src && cd postgis-src && tar xvzf ../postgis.tar.gz --strip-components=1 -C . && \
|
||||
./autogen.sh && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
cd extensions/postgis && \
|
||||
@@ -74,6 +82,15 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postg
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer_data_us.control
|
||||
|
||||
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
|
||||
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
cmake .. && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "plv8-build"
|
||||
@@ -178,6 +195,96 @@ RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b214
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "hypopg-pg-build"
|
||||
# compile hypopg extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS hypopg-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypopg.tar.gz && \
|
||||
mkdir hypopg-src && cd hypopg-src && tar xvzf ../hypopg.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/hypopg.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-hashids-pg-build"
|
||||
# compile pg_hashids extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-hashids-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
|
||||
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_hashids.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
# This layer is used to build `pgx` deps
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS rust-extensions-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y curl libclang-dev cmake && \
|
||||
useradd -ms /bin/bash nonroot -b /home
|
||||
|
||||
ENV HOME=/home/nonroot
|
||||
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
|
||||
USER nonroot
|
||||
WORKDIR /home/nonroot
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
|
||||
chmod +x rustup-init && \
|
||||
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
|
||||
rm rustup-init && \
|
||||
cargo install --git https://github.com/vadim2404/pgx --branch neon_abi_v0.6.1 --locked cargo-pgx && \
|
||||
/bin/bash -c 'cargo pgx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-jsonschema-pg-build"
|
||||
# Compile "pg_jsonschema" extension
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
|
||||
RUN git clone --depth=1 --single-branch --branch neon_abi_v0.1.4 https://github.com/vadim2404/pg_jsonschema/ && \
|
||||
cd pg_jsonschema && \
|
||||
cargo pgx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_jsonschema.control && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-graphql-pg-build"
|
||||
# Compile "pg_graphql" extension
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
|
||||
RUN git clone --depth=1 --single-branch --branch neon_abi_v1.1.0 https://github.com/vadim2404/pg_graphql && \
|
||||
cd pg_graphql && \
|
||||
cargo pgx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
@@ -193,6 +300,10 @@ COPY --from=h3-pg-build /h3/usr /
|
||||
COPY --from=unit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=vector-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgjwt-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-jsonschema-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-graphql-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -255,6 +366,7 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
|
||||
# libicu67, locales for collations (including ICU)
|
||||
# libossp-uuid16 for extension ossp-uuid
|
||||
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
|
||||
# libxml2, libxslt1.1 for xml2
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends -y \
|
||||
locales \
|
||||
@@ -266,6 +378,8 @@ RUN apt update && \
|
||||
libproj19 \
|
||||
libprotobuf-c1 \
|
||||
libsfcgal1 \
|
||||
libxml2 \
|
||||
libxslt1.1 \
|
||||
gdb && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
@@ -10,7 +10,6 @@ RUN set -e \
|
||||
&& rm -f /etc/inittab \
|
||||
&& touch /etc/inittab
|
||||
|
||||
ADD vm-cgconfig.conf /etc/cgconfig.conf
|
||||
RUN set -e \
|
||||
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant --auto-restart --cgroup=neon-postgres'" >> /etc/inittab
|
||||
@@ -26,6 +25,7 @@ RUN apt update && \
|
||||
RUN adduser vm-informant --disabled-password --no-create-home
|
||||
USER postgres
|
||||
|
||||
ADD vm-cgconfig.conf /etc/cgconfig.conf
|
||||
COPY --from=informant /etc/inittab /etc/inittab
|
||||
COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant
|
||||
|
||||
|
||||
@@ -34,6 +34,11 @@ dnf install flex bison readline-devel zlib-devel openssl-devel \
|
||||
libseccomp-devel perl clang cmake postgresql postgresql-contrib protobuf-compiler \
|
||||
protobuf-devel
|
||||
```
|
||||
* On Arch based systems, these packages are needed:
|
||||
```bash
|
||||
pacman -S base-devel readline zlib libseccomp openssl clang \
|
||||
postgresql-libs cmake postgresql protobuf
|
||||
```
|
||||
|
||||
2. [Install Rust](https://www.rust-lang.org/tools/install)
|
||||
```
|
||||
|
||||
@@ -17,6 +17,7 @@ regex.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tar.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tokio-postgres.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -65,6 +65,9 @@ fn main() -> Result<()> {
|
||||
let spec = matches.get_one::<String>("spec");
|
||||
let spec_path = matches.get_one::<String>("spec-path");
|
||||
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
@@ -77,8 +80,27 @@ fn main() -> Result<()> {
|
||||
let path = Path::new(sp);
|
||||
let file = File::open(path)?;
|
||||
serde_json::from_reader(file)?
|
||||
} else if let Some(id) = compute_id {
|
||||
if let Some(cp_base) = control_plane_uri {
|
||||
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
|
||||
reqwest::blocking::Client::new()
|
||||
.get(cp_uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()?
|
||||
.json()?
|
||||
} else {
|
||||
panic!(
|
||||
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
|
||||
control_plane_uri, compute_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
panic!("cluster spec should be provided via --spec or --spec-path argument");
|
||||
panic!("compute spec should be provided via --spec or --spec-path argument");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -227,6 +249,18 @@ fn cli() -> clap::Command {
|
||||
.long("spec-path")
|
||||
.value_name("SPEC_PATH"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("compute-id")
|
||||
.short('i')
|
||||
.long("compute-id")
|
||||
.value_name("COMPUTE_ID"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("control-plane-uri")
|
||||
.short('p')
|
||||
.long("control-plane-uri")
|
||||
.value_name("CONTROL_PLANE"),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,41 +1,5 @@
|
||||
# Synthetic size
|
||||
|
||||
## How to get the data
|
||||
|
||||
Pageserver provides a HTTP API for getting the synthetic size of a tenant
|
||||
along with the data that was used to calculate it. Usage examples:
|
||||
|
||||
1. This query returns the synthetic size of the tenant, along with the "raw" data
|
||||
That is returned in the `segments` and `timeline_inputs` fields.
|
||||
|
||||
```
|
||||
curl localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size | jq
|
||||
```
|
||||
|
||||
2. If `inputs_only=true` is passed, the response will contain only the raw data.
|
||||
Actual synthetic size is not calculated.
|
||||
|
||||
```
|
||||
curl localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size?inputs_only=true | jq
|
||||
|
||||
```
|
||||
|
||||
3. 'retention_period' is a cutoff (in bytes) that overrides the cutoff that is used in the size calculation.
|
||||
Note, that override is applied only if provided `retnention_period` is shorter than the real cutoff.
|
||||
|
||||
```
|
||||
curl localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size?retention_period=1048576 | jq
|
||||
```
|
||||
|
||||
4. If header `Accept: text/html` is passed, the response will be in HTML format.
|
||||
The HTML contains a json with the same data as in the previous examples + SVG diagram of the tenant timelines.
|
||||
|
||||
```
|
||||
curl -H "Accept: text/html" localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size > ./size.html | google-chrome ./size.html
|
||||
```
|
||||
|
||||
## Overview
|
||||
|
||||
Neon storage has copy-on-write branching, which makes it difficult to
|
||||
answer the question "how large is my database"? To give one reasonable
|
||||
answer, we calculate _synthetic size_ for a project.
|
||||
|
||||
@@ -98,6 +98,15 @@ impl RelTag {
|
||||
|
||||
name
|
||||
}
|
||||
|
||||
pub fn with_forknum(&self, forknum: u8) -> Self {
|
||||
RelTag {
|
||||
forknum,
|
||||
spcnode: self.spcnode,
|
||||
dbnode: self.dbnode,
|
||||
relnode: self.relnode,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -21,7 +21,7 @@ toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
metrics.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
pin-project-lite.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -20,7 +20,10 @@ use aws_sdk_s3::{
|
||||
};
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use hyper::Body;
|
||||
use tokio::{io, sync::Semaphore};
|
||||
use tokio::{
|
||||
io::{self, AsyncRead},
|
||||
sync::Semaphore,
|
||||
};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -102,7 +105,7 @@ pub struct S3Bucket {
|
||||
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
|
||||
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
|
||||
// The helps to ensure we don't exceed the thresholds.
|
||||
concurrency_limiter: Semaphore,
|
||||
concurrency_limiter: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -162,7 +165,7 @@ impl S3Bucket {
|
||||
client,
|
||||
bucket_name: aws_config.bucket_name.clone(),
|
||||
prefix_in_bucket,
|
||||
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
|
||||
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -194,9 +197,10 @@ impl S3Bucket {
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
let _guard = self
|
||||
let permit = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")
|
||||
.map_err(DownloadError::Other)?;
|
||||
@@ -217,9 +221,10 @@ impl S3Bucket {
|
||||
let metadata = object_output.metadata().cloned().map(StorageMetadata);
|
||||
Ok(Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(io::BufReader::new(
|
||||
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
|
||||
permit,
|
||||
object_output.body.into_async_read(),
|
||||
)),
|
||||
))),
|
||||
})
|
||||
}
|
||||
Err(SdkError::ServiceError {
|
||||
@@ -240,6 +245,32 @@ impl S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
|
||||
struct RatelimitedAsyncRead<S> {
|
||||
permit: tokio::sync::OwnedSemaphorePermit,
|
||||
#[pin]
|
||||
inner: S,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> RatelimitedAsyncRead<S> {
|
||||
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
|
||||
RatelimitedAsyncRead { permit, inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let this = self.project();
|
||||
this.inner.poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RemoteStorage for S3Bucket {
|
||||
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
|
||||
|
||||
@@ -4,13 +4,14 @@ use anyhow::{anyhow, Context};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
|
||||
use hyper::{Method, StatusCode};
|
||||
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::RequestInfo;
|
||||
use routerify::{Middleware, Router, RouterBuilder, RouterService};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::info;
|
||||
use tracing;
|
||||
|
||||
use std::future::Future;
|
||||
use std::net::TcpListener;
|
||||
@@ -27,7 +28,14 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
async fn logger(res: Response<Body>, info: RequestInfo) -> Result<Response<Body>, ApiError> {
|
||||
info!("{} {} {}", info.method(), info.uri().path(), res.status(),);
|
||||
// cannot factor out the Level to avoid the repetition
|
||||
// because tracing can only work with const Level
|
||||
// which is not the case here
|
||||
if info.method() == Method::GET && res.status() == StatusCode::OK {
|
||||
tracing::debug!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
} else {
|
||||
tracing::info!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -203,7 +211,7 @@ pub fn serve_thread_main<S>(
|
||||
where
|
||||
S: Future<Output = ()> + Send + Sync,
|
||||
{
|
||||
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
|
||||
tracing::info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
|
||||
|
||||
// Create a Service from the router above to handle incoming requests.
|
||||
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();
|
||||
|
||||
@@ -45,3 +45,115 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Disable the default rust panic hook by using `set_hook`.
|
||||
///
|
||||
/// For neon binaries, the assumption is that tracing is configured before with [`init`], after
|
||||
/// that sentry is configured (if needed). sentry will install it's own on top of this, always
|
||||
/// processing the panic before we log it.
|
||||
///
|
||||
/// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
|
||||
/// If the assumptions about the initialization order are not held, use
|
||||
/// [`TracingPanicHookGuard::disarm`] but keep in mind, if tracing is stopped, then panics will be
|
||||
/// lost.
|
||||
#[must_use]
|
||||
pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {
|
||||
std::panic::set_hook(Box::new(tracing_panic_hook));
|
||||
TracingPanicHookGuard::new()
|
||||
}
|
||||
|
||||
/// Drop guard which restores the std panic hook on drop.
|
||||
///
|
||||
/// Tracing should not be used when it's not configured, but we cannot really latch on to any
|
||||
/// imaginary lifetime of tracing.
|
||||
pub struct TracingPanicHookGuard {
|
||||
act: bool,
|
||||
}
|
||||
|
||||
impl TracingPanicHookGuard {
|
||||
fn new() -> Self {
|
||||
TracingPanicHookGuard { act: true }
|
||||
}
|
||||
|
||||
/// Make this hook guard not do anything when dropped.
|
||||
pub fn forget(&mut self) {
|
||||
self.act = false;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TracingPanicHookGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.act {
|
||||
let _ = std::panic::take_hook();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Named symbol for our panic hook, which logs the panic.
|
||||
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
|
||||
// following rust 1.66.1 std implementation:
|
||||
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
|
||||
let location = info.location();
|
||||
|
||||
let msg = match info.payload().downcast_ref::<&'static str>() {
|
||||
Some(s) => *s,
|
||||
None => match info.payload().downcast_ref::<String>() {
|
||||
Some(s) => &s[..],
|
||||
None => "Box<dyn Any>",
|
||||
},
|
||||
};
|
||||
|
||||
let thread = std::thread::current();
|
||||
let thread = thread.name().unwrap_or("<unnamed>");
|
||||
let backtrace = std::backtrace::Backtrace::capture();
|
||||
|
||||
let _entered = if let Some(location) = location {
|
||||
tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
|
||||
} else {
|
||||
// very unlikely to hit here, but the guarantees of std could change
|
||||
tracing::error_span!("panic", %thread)
|
||||
}
|
||||
.entered();
|
||||
|
||||
if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
|
||||
// this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
|
||||
// get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
|
||||
// string, maybe even to a TLS one but tracing already does that.
|
||||
tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
|
||||
} else {
|
||||
tracing::error!("{msg}");
|
||||
}
|
||||
|
||||
// ensure that we log something on the panic if this hook is left after tracing has been
|
||||
// unconfigured. worst case when teardown is racing the panic is to log the panic twice.
|
||||
tracing::dispatcher::get_default(|d| {
|
||||
if let Some(_none) = d.downcast_ref::<tracing::subscriber::NoSubscriber>() {
|
||||
let location = location.map(PrettyLocation);
|
||||
log_panic_to_stderr(thread, msg, location, &backtrace);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn log_panic_to_stderr(
|
||||
thread: &str,
|
||||
msg: &str,
|
||||
location: Option<PrettyLocation<'_, '_>>,
|
||||
backtrace: &std::backtrace::Backtrace,
|
||||
) {
|
||||
eprintln!("panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}");
|
||||
}
|
||||
|
||||
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
|
||||
|
||||
impl std::fmt::Display for PrettyLocation<'_, '_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PrettyLocation<'_, '_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
<Self as std::fmt::Display>::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,7 +233,7 @@ impl PostgresBackend {
|
||||
}
|
||||
|
||||
/// Write message into internal output buffer.
|
||||
pub fn write_message(&mut self, message: &BeMessage<'_>) -> io::Result<&mut Self> {
|
||||
pub fn write_message_noflush(&mut self, message: &BeMessage<'_>) -> io::Result<&mut Self> {
|
||||
BeMessage::write(&mut self.buf_out, message)?;
|
||||
Ok(self)
|
||||
}
|
||||
@@ -383,7 +383,7 @@ impl PostgresBackend {
|
||||
FeStartupPacket::SslRequest => {
|
||||
debug!("SSL requested");
|
||||
|
||||
self.write_message(&BeMessage::EncryptionResponse(have_tls))?;
|
||||
self.write_message_noflush(&BeMessage::EncryptionResponse(have_tls))?;
|
||||
if have_tls {
|
||||
self.start_tls().await?;
|
||||
self.state = ProtoState::Encrypted;
|
||||
@@ -391,11 +391,11 @@ impl PostgresBackend {
|
||||
}
|
||||
FeStartupPacket::GssEncRequest => {
|
||||
debug!("GSS requested");
|
||||
self.write_message(&BeMessage::EncryptionResponse(false))?;
|
||||
self.write_message_noflush(&BeMessage::EncryptionResponse(false))?;
|
||||
}
|
||||
FeStartupPacket::StartupMessage { .. } => {
|
||||
if have_tls && !matches!(self.state, ProtoState::Encrypted) {
|
||||
self.write_message(&BeMessage::ErrorResponse(
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
"must connect with TLS",
|
||||
None,
|
||||
))?;
|
||||
@@ -410,15 +410,17 @@ impl PostgresBackend {
|
||||
|
||||
match self.auth_type {
|
||||
AuthType::Trust => {
|
||||
self.write_message(&BeMessage::AuthenticationOk)?
|
||||
.write_message(&BeMessage::CLIENT_ENCODING)?
|
||||
self.write_message_noflush(&BeMessage::AuthenticationOk)?
|
||||
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
|
||||
// The async python driver requires a valid server_version
|
||||
.write_message(&BeMessage::server_version("14.1"))?
|
||||
.write_message(&BeMessage::ReadyForQuery)?;
|
||||
.write_message_noflush(&BeMessage::server_version("14.1"))?
|
||||
.write_message_noflush(&BeMessage::ReadyForQuery)?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
AuthType::NeonJWT => {
|
||||
self.write_message(&BeMessage::AuthenticationCleartextPassword)?;
|
||||
self.write_message_noflush(
|
||||
&BeMessage::AuthenticationCleartextPassword,
|
||||
)?;
|
||||
self.state = ProtoState::Authentication;
|
||||
}
|
||||
}
|
||||
@@ -441,7 +443,7 @@ impl PostgresBackend {
|
||||
let (_, jwt_response) = m.split_last().context("protocol violation")?;
|
||||
|
||||
if let Err(e) = handler.check_auth_jwt(self, jwt_response) {
|
||||
self.write_message(&BeMessage::ErrorResponse(
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&e.to_string(),
|
||||
Some(e.pg_error_code()),
|
||||
))?;
|
||||
@@ -449,9 +451,9 @@ impl PostgresBackend {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.write_message(&BeMessage::AuthenticationOk)?
|
||||
.write_message(&BeMessage::CLIENT_ENCODING)?
|
||||
.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.write_message_noflush(&BeMessage::AuthenticationOk)?
|
||||
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
|
||||
.write_message_noflush(&BeMessage::ReadyForQuery)?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
|
||||
@@ -486,30 +488,30 @@ impl PostgresBackend {
|
||||
if let Err(e) = handler.process_query(self, query_string).await {
|
||||
log_query_error(query_string, &e);
|
||||
let short_error = short_error(&e);
|
||||
self.write_message(&BeMessage::ErrorResponse(
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&short_error,
|
||||
Some(e.pg_error_code()),
|
||||
))?;
|
||||
}
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.write_message_noflush(&BeMessage::ReadyForQuery)?;
|
||||
}
|
||||
|
||||
FeMessage::Parse(m) => {
|
||||
*unnamed_query_string = m.query_string;
|
||||
self.write_message(&BeMessage::ParseComplete)?;
|
||||
self.write_message_noflush(&BeMessage::ParseComplete)?;
|
||||
}
|
||||
|
||||
FeMessage::Describe(_) => {
|
||||
self.write_message(&BeMessage::ParameterDescription)?
|
||||
.write_message(&BeMessage::NoData)?;
|
||||
self.write_message_noflush(&BeMessage::ParameterDescription)?
|
||||
.write_message_noflush(&BeMessage::NoData)?;
|
||||
}
|
||||
|
||||
FeMessage::Bind(_) => {
|
||||
self.write_message(&BeMessage::BindComplete)?;
|
||||
self.write_message_noflush(&BeMessage::BindComplete)?;
|
||||
}
|
||||
|
||||
FeMessage::Close(_) => {
|
||||
self.write_message(&BeMessage::CloseComplete)?;
|
||||
self.write_message_noflush(&BeMessage::CloseComplete)?;
|
||||
}
|
||||
|
||||
FeMessage::Execute(_) => {
|
||||
@@ -517,7 +519,7 @@ impl PostgresBackend {
|
||||
trace!("got execute {query_string:?}");
|
||||
if let Err(e) = handler.process_query(self, query_string).await {
|
||||
log_query_error(query_string, &e);
|
||||
self.write_message(&BeMessage::ErrorResponse(
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&e.to_string(),
|
||||
Some(e.pg_error_code()),
|
||||
))?;
|
||||
@@ -529,7 +531,7 @@ impl PostgresBackend {
|
||||
}
|
||||
|
||||
FeMessage::Sync => {
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.write_message_noflush(&BeMessage::ReadyForQuery)?;
|
||||
}
|
||||
|
||||
FeMessage::Terminate => {
|
||||
@@ -579,7 +581,7 @@ impl<'a> AsyncWrite for CopyDataWriter<'a> {
|
||||
// XXX: if the input is large, we should split it into multiple messages.
|
||||
// Not sure what the threshold should be, but the ultimate hard limit is that
|
||||
// the length cannot exceed u32.
|
||||
this.pgb.write_message(&BeMessage::CopyData(buf))?;
|
||||
this.pgb.write_message_noflush(&BeMessage::CopyData(buf))?;
|
||||
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
|
||||
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
|
||||
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
|
||||
use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::PG_TLI;
|
||||
@@ -190,14 +191,31 @@ where
|
||||
{
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
// Gather and send relational files in each database if full backup is requested.
|
||||
if self.full_backup {
|
||||
for rel in self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_rel(rel).await?;
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.full_backup {
|
||||
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
|
||||
{
|
||||
// skip this, will include it when we reach the init fork
|
||||
continue;
|
||||
}
|
||||
self.add_rel(rel, rel).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -220,15 +238,16 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
|
||||
/// Add contents of relfilenode `src`, naming it as `dst`.
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(tag, self.lsn, false, self.ctx)
|
||||
.get_rel_size(src, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
if nblocks == 0 {
|
||||
let file_name = tag.to_segfile_name(0);
|
||||
let file_name = dst.to_segfile_name(0);
|
||||
let header = new_tar_header(&file_name, 0)?;
|
||||
self.ar.append(&header, &mut io::empty()).await?;
|
||||
return Ok(());
|
||||
@@ -244,12 +263,12 @@ where
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(tag, blknum, self.lsn, false, self.ctx)
|
||||
.get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
|
||||
let file_name = tag.to_segfile_name(seg as u32);
|
||||
let file_name = dst.to_segfile_name(seg as u32);
|
||||
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
|
||||
self.ar.append(&header, segment_data.as_slice()).await?;
|
||||
|
||||
|
||||
@@ -91,9 +91,9 @@ fn main() -> anyhow::Result<()> {
|
||||
// Initialize logging, which must be initialized before the custom panic hook is installed.
|
||||
logging::init(conf.log_format)?;
|
||||
|
||||
// disable the default rust panic hook by using `set_hook`. sentry will install it's own on top
|
||||
// of this, always processing the panic before we log it.
|
||||
std::panic::set_hook(Box::new(tracing_panic_hook));
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
let _sentry_guard = init_sentry(
|
||||
@@ -499,50 +499,6 @@ fn cli() -> Command {
|
||||
)
|
||||
}
|
||||
|
||||
/// Named symbol for our panic hook, which logs the panic.
|
||||
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
|
||||
// following rust 1.66.1 std implementation:
|
||||
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
|
||||
let location = info.location();
|
||||
|
||||
let msg = match info.payload().downcast_ref::<&'static str>() {
|
||||
Some(s) => *s,
|
||||
None => match info.payload().downcast_ref::<String>() {
|
||||
Some(s) => &s[..],
|
||||
None => "Box<dyn Any>",
|
||||
},
|
||||
};
|
||||
|
||||
let thread = std::thread::current();
|
||||
let thread = thread.name().unwrap_or("<unnamed>");
|
||||
let backtrace = std::backtrace::Backtrace::capture();
|
||||
|
||||
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
|
||||
|
||||
impl std::fmt::Display for PrettyLocation<'_, '_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
|
||||
}
|
||||
}
|
||||
|
||||
let _entered = if let Some(location) = location {
|
||||
tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
|
||||
} else {
|
||||
// very unlikely to hit here, but the guarantees of std could change
|
||||
tracing::error_span!("panic", %thread)
|
||||
}
|
||||
.entered();
|
||||
|
||||
if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
|
||||
// this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
|
||||
// get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
|
||||
// string, maybe even to a TLS one but tracing already does that.
|
||||
tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
|
||||
} else {
|
||||
tracing::error!("{msg}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_cli() {
|
||||
cli().debug_assert();
|
||||
|
||||
@@ -64,7 +64,7 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Byt
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = format!("pageserver is shutting down");
|
||||
let _ = pgb.write_message(&BeMessage::ErrorResponse(&msg, None));
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
@@ -80,13 +80,13 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Byt
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error_error = QueryError::Disconnected(ConnectionError::Socket(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
pgb.write_message(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
pgb.write_message(&BeMessage::ErrorResponse(&msg, None))?;
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None))?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
@@ -97,7 +97,7 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Byt
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error_error = QueryError::Disconnected(ConnectionError::Socket(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
pgb.write_message(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
|
||||
pgb.flush().await?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
@@ -311,7 +311,7 @@ impl PageServerHandler {
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
|
||||
@@ -380,7 +380,7 @@ impl PageServerHandler {
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -416,7 +416,7 @@ impl PageServerHandler {
|
||||
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message(&BeMessage::CopyInResponse)?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb));
|
||||
@@ -468,7 +468,7 @@ impl PageServerHandler {
|
||||
|
||||
// Import wal provided via CopyData
|
||||
info!("importing wal");
|
||||
pgb.write_message(&BeMessage::CopyInResponse)?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb));
|
||||
let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream);
|
||||
@@ -678,7 +678,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message(&BeMessage::CopyOutResponse)?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
// Send a tarball of the latest layer on the timeline
|
||||
@@ -695,7 +695,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
}
|
||||
|
||||
pgb.write_message(&BeMessage::CopyDone)?;
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
pgb.flush().await?;
|
||||
info!("basebackup complete");
|
||||
|
||||
@@ -812,7 +812,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
// return pair of prev_lsn and last_lsn
|
||||
else if query_string.starts_with("get_last_record_rlsn ") {
|
||||
@@ -835,15 +835,15 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
|
||||
let end_of_timeline = timeline.get_last_record_rlsn();
|
||||
|
||||
pgb.write_message(&BeMessage::RowDescription(&[
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::text_col(b"prev_lsn"),
|
||||
RowDescriptor::text_col(b"last_lsn"),
|
||||
]))?
|
||||
.write_message(&BeMessage::DataRow(&[
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(end_of_timeline.prev.to_string().as_bytes()),
|
||||
Some(end_of_timeline.last.to_string().as_bytes()),
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
// same as basebackup, but result includes relational data as well
|
||||
else if query_string.starts_with("fullbackup ") {
|
||||
@@ -884,7 +884,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true, ctx)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("import basebackup ") {
|
||||
// Import the `base` section (everything but the wal) of a basebackup.
|
||||
// Assumes the tenant already exists on this pageserver.
|
||||
@@ -929,10 +929,10 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Err(e) => {
|
||||
error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
|
||||
pgb.write_message(&BeMessage::ErrorResponse(
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&e.to_string(),
|
||||
Some(e.pg_error_code()),
|
||||
))?
|
||||
@@ -965,10 +965,10 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
.handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Err(e) => {
|
||||
error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
|
||||
pgb.write_message(&BeMessage::ErrorResponse(
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&e.to_string(),
|
||||
Some(e.pg_error_code()),
|
||||
))?
|
||||
@@ -977,7 +977,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
// on connect
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("show ") {
|
||||
// show <tenant_id>
|
||||
let (_, params_raw) = query_string.split_at("show ".len());
|
||||
@@ -993,7 +993,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
pgb.write_message(&BeMessage::RowDescription(&[
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"checkpoint_timeout"),
|
||||
RowDescriptor::int8_col(b"compaction_target_size"),
|
||||
@@ -1004,7 +1004,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
RowDescriptor::int8_col(b"image_creation_threshold"),
|
||||
RowDescriptor::int8_col(b"pitr_interval"),
|
||||
]))?
|
||||
.write_message(&BeMessage::DataRow(&[
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
|
||||
Some(
|
||||
tenant
|
||||
@@ -1027,7 +1027,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
|
||||
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"unknown command {query_string}"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
@@ -11,6 +11,7 @@ use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::TenantId;
|
||||
|
||||
@@ -53,37 +54,55 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
let tenant = tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
let mut sleep_duration = tenant.get_compaction_period();
|
||||
if sleep_duration == Duration::ZERO {
|
||||
info!("automatic compaction is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
sleep_duration = Duration::from_secs(10);
|
||||
} else {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&ctx).await {
|
||||
sleep_duration = wait_duration;
|
||||
error!("Compaction failed, retrying in {:?}: {e:?}", sleep_duration);
|
||||
let period = tenant.get_compaction_period();
|
||||
|
||||
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
|
||||
// loop, #3501. There are also additional "allowed_errors" in tests.
|
||||
if first {
|
||||
first = false;
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let started_at = Instant::now();
|
||||
|
||||
let sleep_duration = if period == Duration::ZERO {
|
||||
info!("automatic compaction is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&ctx).await {
|
||||
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
} else {
|
||||
period
|
||||
}
|
||||
};
|
||||
|
||||
warn_when_period_overrun(started_at.elapsed(), period, "compaction");
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request during idling");
|
||||
break;
|
||||
},
|
||||
@@ -105,14 +124,16 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
||||
// cutoff specified as time.
|
||||
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
let tenant = tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
@@ -122,27 +143,38 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
},
|
||||
};
|
||||
|
||||
let gc_period = tenant.get_gc_period();
|
||||
let gc_horizon = tenant.get_gc_horizon();
|
||||
let mut sleep_duration = gc_period;
|
||||
if sleep_duration == Duration::ZERO {
|
||||
info!("automatic GC is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
sleep_duration = Duration::from_secs(10);
|
||||
} else {
|
||||
// Run gc
|
||||
if gc_horizon > 0 {
|
||||
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await
|
||||
{
|
||||
sleep_duration = wait_duration;
|
||||
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);
|
||||
}
|
||||
let period = tenant.get_gc_period();
|
||||
|
||||
if first {
|
||||
first = false;
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let started_at = Instant::now();
|
||||
|
||||
let gc_horizon = tenant.get_gc_horizon();
|
||||
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
|
||||
info!("automatic GC is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run gc
|
||||
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
|
||||
if let Err(e) = res {
|
||||
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
} else {
|
||||
period
|
||||
}
|
||||
};
|
||||
|
||||
warn_when_period_overrun(started_at.elapsed(), period, "gc");
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request during idling");
|
||||
break;
|
||||
},
|
||||
@@ -197,3 +229,49 @@ async fn wait_for_active_tenant(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("cancelled")]
|
||||
pub(crate) struct Cancelled;
|
||||
|
||||
/// Provide a random delay for background task initialization.
|
||||
///
|
||||
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
|
||||
/// different periods for more stable load.
|
||||
pub(crate) async fn random_init_delay(
|
||||
period: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), Cancelled> {
|
||||
use rand::Rng;
|
||||
|
||||
let d = {
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
// gen_range asserts that the range cannot be empty, which it could be because period can
|
||||
// be set to zero to disable gc or compaction, so lets set it to be at least 10s.
|
||||
let period = std::cmp::max(period, Duration::from_secs(10));
|
||||
|
||||
// semi-ok default as the source of jitter
|
||||
rng.gen_range(Duration::ZERO..=period)
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => Err(Cancelled),
|
||||
_ = tokio::time::sleep(d) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
|
||||
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
|
||||
if elapsed >= period && period != Duration::ZERO {
|
||||
// humantime does no significant digits clamping whereas Duration's debug is a bit more
|
||||
// intelligent. however it makes sense to keep the "configuration format" for period, even
|
||||
// though there's no way to output the actual config value.
|
||||
warn!(
|
||||
?elapsed,
|
||||
period = %humantime::format_duration(period),
|
||||
task,
|
||||
"task iteration took longer than the configured period"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
@@ -82,6 +83,25 @@ enum FlushLoopState {
|
||||
Exited,
|
||||
}
|
||||
|
||||
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Hole {
|
||||
key_range: Range<Key>,
|
||||
coverage_size: usize,
|
||||
}
|
||||
|
||||
impl Ord for Hole {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
other.coverage_size.cmp(&self.coverage_size) // inverse order
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for Hole {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
@@ -1770,15 +1790,9 @@ impl Timeline {
|
||||
let calculation = async {
|
||||
let cancel = cancel.child_token();
|
||||
let ctx = ctx.attached_child();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// Run in a separate thread since this can do a lot of
|
||||
// synchronous file IO without .await inbetween
|
||||
// if there are no RemoteLayers that would require downloading.
|
||||
let h = tokio::runtime::Handle::current();
|
||||
h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel, &ctx))
|
||||
})
|
||||
.await
|
||||
.context("Failed to spawn calculation result task")?
|
||||
self_calculation
|
||||
.calculate_logical_size(init_lsn, cancel, &ctx)
|
||||
.await
|
||||
};
|
||||
let timeline_state_cancellation = async {
|
||||
loop {
|
||||
@@ -1811,7 +1825,7 @@ impl Timeline {
|
||||
tokio::pin!(calculation);
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = &mut calculation => { return res }
|
||||
res = &mut calculation => { return res }
|
||||
reason = timeline_state_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
@@ -2947,6 +2961,47 @@ impl Timeline {
|
||||
},
|
||||
)?;
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
drop(layers);
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
// Merge the contents of all the input delta layers into a new set
|
||||
// of delta layers, based on the current partitioning.
|
||||
//
|
||||
@@ -3041,14 +3096,22 @@ impl Timeline {
|
||||
}
|
||||
if writer.is_some() {
|
||||
let written_size = writer.as_mut().unwrap().size();
|
||||
// check if key cause layer overflow...
|
||||
let contains_hole =
|
||||
next_hole < holes.len() && key >= holes[next_hole].key_range.end;
|
||||
// check if key cause layer overflow or contains hole...
|
||||
if is_dup_layer
|
||||
|| dup_end_lsn.is_valid()
|
||||
|| written_size + key_values_total_size > target_file_size
|
||||
|| contains_hole
|
||||
{
|
||||
// ... if so, flush previous layer and prepare to write new one
|
||||
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
|
||||
writer = None;
|
||||
|
||||
if contains_hole {
|
||||
// skip hole
|
||||
next_hole += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Remember size of key value because at next iteration we will access next item
|
||||
@@ -3745,6 +3808,7 @@ impl Timeline {
|
||||
remote_layer.ongoing_download.close();
|
||||
} else {
|
||||
// Keep semaphore open. We'll drop the permit at the end of the function.
|
||||
info!("on-demand download failed: {:?}", result.as_ref().unwrap_err());
|
||||
}
|
||||
|
||||
// Don't treat it as an error if the task that triggered the download
|
||||
|
||||
@@ -41,9 +41,23 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
|
||||
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
{
|
||||
let policy = self.get_eviction_policy();
|
||||
let period = match policy {
|
||||
EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
|
||||
EvictionPolicy::NoEviction => Duration::from_secs(10),
|
||||
};
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
info!("shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let policy = self.get_eviction_policy();
|
||||
let cf = self.eviction_iteration(&policy, cancel.clone()).await;
|
||||
|
||||
match cf {
|
||||
ControlFlow::Break(()) => break,
|
||||
ControlFlow::Continue(sleep_until) => {
|
||||
@@ -78,13 +92,7 @@ impl Timeline {
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed > p.period {
|
||||
warn!(
|
||||
configured_period = %humantime::format_duration(p.period),
|
||||
last_period = %humantime::format_duration(elapsed),
|
||||
"this eviction period took longer than the configured period"
|
||||
);
|
||||
}
|
||||
crate::tenant::tasks::warn_when_period_overrun(elapsed, p.period, "eviction");
|
||||
ControlFlow::Continue(start + p.period)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::walrecord::*;
|
||||
use crate::ZERO_PAGE;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
||||
use postgres_ffi::v14::xlog_utils::*;
|
||||
use postgres_ffi::v14::CheckPoint;
|
||||
@@ -762,7 +762,7 @@ impl<'a> WalIngest<'a> {
|
||||
)?;
|
||||
|
||||
for xnode in &parsed.xnodes {
|
||||
for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
|
||||
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
|
||||
let rel = RelTag {
|
||||
forknum,
|
||||
spcnode: xnode.spcnode,
|
||||
|
||||
@@ -43,6 +43,7 @@ async fn flatten_err(
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
info!("Version: {GIT_VERSION}");
|
||||
|
||||
@@ -126,7 +126,12 @@ fn main() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// important to keep the order of:
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(LogFormat::from_config(&args.log_format)?)?;
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
let args_workdir = &args.datadir;
|
||||
|
||||
@@ -424,12 +424,16 @@ async fn http1_handler(
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
// important to keep the order of:
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(LogFormat::from_config(&args.log_format)?)?;
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
info!("version: {GIT_VERSION}");
|
||||
::metrics::set_build_info_metric(GIT_VERSION);
|
||||
|
||||
|
||||
@@ -1669,7 +1669,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
timeout=timeout,
|
||||
)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
log.info(f"Run {res.args} success: {res.stdout}")
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
@@ -2079,6 +2079,9 @@ class NeonPageserver(PgProtocol):
|
||||
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# this is until #3501
|
||||
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
|
||||
]
|
||||
|
||||
def start(
|
||||
@@ -3460,6 +3463,14 @@ def wait_for_last_flush_lsn(
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def wait_for_wal_insert_lsn(
|
||||
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def fork_at_current_lsn(
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
|
||||
@@ -3,8 +3,15 @@ from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverHttpClient,
|
||||
Postgres,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
@@ -324,7 +331,7 @@ def test_single_branch_get_tenant_size_grows(
|
||||
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
|
||||
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
|
||||
# obviously lead to issues when calculating the size.
|
||||
gc_horizon = 0x30000
|
||||
gc_horizon = 0x38000
|
||||
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -334,29 +341,75 @@ def test_single_branch_get_tenant_size_grows(
|
||||
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
collected_responses: List[Tuple[Lsn, int]] = []
|
||||
collected_responses: List[Tuple[str, Lsn, int]] = []
|
||||
|
||||
size_debug_file = open(test_output_dir / "size_debug.html", "w")
|
||||
|
||||
def check_size_change(current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev: int):
|
||||
if current_lsn - initdb_lsn > gc_horizon:
|
||||
def check_size_change(
|
||||
current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev_size: int
|
||||
):
|
||||
if current_lsn - initdb_lsn >= gc_horizon:
|
||||
assert (
|
||||
size >= prev
|
||||
size >= prev_size
|
||||
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
|
||||
else:
|
||||
assert (
|
||||
size > prev
|
||||
size > prev_size
|
||||
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
|
||||
|
||||
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
|
||||
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
def get_current_consistent_size(
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
size_debug_file, # apparently there is no public signature for open()...
|
||||
http_client: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Tuple[Lsn, int]:
|
||||
consistent = False
|
||||
size_debug = None
|
||||
|
||||
current_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
|
||||
# We want to make sure we have a self-consistent set of values.
|
||||
# Size changes with WAL, so only if both before and after getting
|
||||
# the size of the tenant reports the same WAL insert LSN, we're OK
|
||||
# to use that (size, LSN) combination.
|
||||
# Note that 'wait_for_wal_flush_lsn' is not accurate enough: There
|
||||
# can be more wal after the flush LSN that can arrive on the
|
||||
# pageserver before we're requesting the page size.
|
||||
# Anyway, in general this is only one iteration, so in general
|
||||
# this is fine.
|
||||
while not consistent:
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
|
||||
after_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
|
||||
consistent = current_lsn == after_lsn
|
||||
current_lsn = after_lsn
|
||||
size_debug_file.write(size_debug)
|
||||
return (current_lsn, size)
|
||||
|
||||
with env.postgres.create_start(
|
||||
branch_name,
|
||||
tenant_id=tenant_id,
|
||||
### autovacuum is disabled to limit WAL logging.
|
||||
config_lines=["autovacuum=off"],
|
||||
) as pg:
|
||||
(initdb_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
collected_responses.append(("INITDB", initdb_lsn, size))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)")
|
||||
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL) WITH (fillfactor = 40)")
|
||||
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
collected_responses.append(("CREATE", current_lsn, size))
|
||||
|
||||
batch_size = 100
|
||||
|
||||
i = 0
|
||||
while True:
|
||||
for i in range(3):
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)",
|
||||
@@ -365,27 +418,24 @@ def test_single_branch_get_tenant_size_grows(
|
||||
|
||||
i += 1
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
prev_size = collected_responses[-1][2]
|
||||
if size == 0:
|
||||
assert prev_size == 0
|
||||
else:
|
||||
# branch start shouldn't be past gc_horizon yet
|
||||
# thus the size should grow as we insert more data
|
||||
# "gc_horizon" is tuned so that it kicks in _after_ the
|
||||
# insert phase, but before the update phase ends.
|
||||
assert (
|
||||
current_lsn - initdb_lsn <= gc_horizon
|
||||
), "Tuning of GC window is likely out-of-date"
|
||||
assert size > prev_size
|
||||
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
if len(collected_responses) > 0:
|
||||
prev = collected_responses[-1][1]
|
||||
if size == 0:
|
||||
assert prev == 0
|
||||
else:
|
||||
# branch start shouldn't be past gc_horizon yet
|
||||
# thus the size should grow as we insert more data
|
||||
assert current_lsn - initdb_lsn <= gc_horizon
|
||||
assert size > prev
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
|
||||
if len(collected_responses) > 2:
|
||||
break
|
||||
collected_responses.append(("INSERT", current_lsn, size))
|
||||
|
||||
while True:
|
||||
with pg.cursor() as cur:
|
||||
@@ -397,18 +447,15 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if updated == 0:
|
||||
break
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
prev = collected_responses[-1][1]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("UPDATE", current_lsn, size))
|
||||
|
||||
while True:
|
||||
with pg.cursor() as cur:
|
||||
@@ -418,40 +465,47 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if deleted == 0:
|
||||
break
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("DELETE", current_lsn, size))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("DROP TABLE t0")
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
# The size of the tenant should still be as large as before we dropped
|
||||
# the table, because the drop operation can still be undone in the PITR
|
||||
# defined by gc_horizon.
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("DROP", current_lsn, size))
|
||||
|
||||
# this isn't too many lines to forget for a while. observed while
|
||||
# developing these tests that locally the value is a bit more than what we
|
||||
# get in the ci.
|
||||
for lsn, size in collected_responses:
|
||||
log.info(f"collected: {lsn}, {size}")
|
||||
for phase, lsn, size in collected_responses:
|
||||
log.info(f"collected: {phase}, {lsn}, {size}")
|
||||
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
size_debug_file.close()
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev = collected_responses[-1][2]
|
||||
|
||||
assert size_after == prev, "size after restarting pageserver should not have changed"
|
||||
|
||||
|
||||
34
test_runner/regress/test_unlogged.py
Normal file
34
test_runner/regress/test_unlogged.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||
|
||||
|
||||
#
|
||||
# Test UNLOGGED tables/relations. Postgres copies init fork contents to main
|
||||
# fork to reset them during recovery. In Neon, pageserver directly sends init
|
||||
# fork contents as main fork during basebackup.
|
||||
#
|
||||
def test_unlogged(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_unlogged", "empty")
|
||||
pg = env.postgres.create_start("test_unlogged")
|
||||
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("CREATE UNLOGGED TABLE iut (id int);")
|
||||
# create index to test unlogged index relation as well
|
||||
cur.execute("CREATE UNIQUE INDEX iut_idx ON iut (id);")
|
||||
cur.execute("INSERT INTO iut values (42);")
|
||||
|
||||
# create another compute to fetch inital empty contents from pageserver
|
||||
fork_at_current_lsn(env, pg, "test_unlogged_basebackup", "test_unlogged")
|
||||
pg2 = env.postgres.create_start(
|
||||
"test_unlogged_basebackup",
|
||||
)
|
||||
|
||||
conn2 = pg2.connect()
|
||||
cur2 = conn2.cursor()
|
||||
# after restart table should be empty but valid
|
||||
cur2.execute("PREPARE iut_plan (int) AS INSERT INTO iut VALUES ($1)")
|
||||
cur2.execute("EXECUTE iut_plan (43);")
|
||||
cur2.execute("SELECT * FROM iut")
|
||||
assert cur2.fetchall() == [(43,)]
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: f210ac524b...5fb2e0bba0
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 33f9763454...919851e781
Reference in New Issue
Block a user