Compare commits

..

18 Commits

Author SHA1 Message Date
Bojan Serafimov
edc77e8a24 Add pyo3 prototype 2022-08-30 11:28:22 -04:00
Bojan Serafimov
324c09d19f Cleanup 2022-08-18 15:17:30 -04:00
Bojan Serafimov
1501dbd5a5 Cleanup 2022-08-18 15:16:14 -04:00
Bojan Serafimov
4e91ff31ca Fmt 2022-08-18 13:20:01 -04:00
Bojan Serafimov
fe1b471cfc Cleanup 2022-08-18 13:14:05 -04:00
Bojan Serafimov
3e0b57e766 Convert bin to lib 2022-08-18 12:41:46 -04:00
Bojan Serafimov
7bff7c4014 Add library 2022-08-18 12:40:14 -04:00
Bojan Serafimov
11a1f2a1d8 Initial experiment 2022-08-18 12:27:43 -04:00
Rory de Zoete
2db675a2f2 Re-enable test dependency for deploy (#2300)
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
2022-08-18 15:18:59 +02:00
Anton Galitsyn
77a2bdf3d7 on safekeeper registration pass availability zone param (#2292) 2022-08-18 15:05:40 +03:00
Arthur Petukhovsky
976576ae59 Fix walreceiver and safekeeper bugs (#2295)
- There was an issue with zero commit_lsn `reason: LaggingWal { current_commit_lsn: 0/0, new_commit_lsn: 1/6FD90D38, threshold: 10485760 } }`. The problem was in `send_wal.rs`, where we initialized `end_pos = Lsn(0)` and in some cases sent it to the pageserver.
- IDENTIFY_SYSTEM previously returned `flush_lsn` as a physical end of WAL. Now it returns `flush_lsn` (as it was) to walproposer and `commit_lsn` to everyone else including pageserver.
- There was an issue with backoff where connection was cancelled right after initialization: `connected!` -> `safekeeper_handle_db: Connection cancelled` -> `Backoff: waiting 3 seconds`. The problem was in sleeping before establishing the connection. This is fixed by reworking retry logic.
- There was an issue with getting `NoKeepAlives` reason in a loop. The issue is probably the same as the previous.
- There was an issue with filtering safekeepers based on retry attempts, which could filter some safekeepers indefinetely. This is fixed by using retry cooldown duration instead of retry attempts.
- Some `send_wal.rs` connections failed with errors without context. This is fixed by adding a timeline to safekeepers errors.

New retry logic works like this:
- Every candidate has a `next_retry_at` timestamp and is not considered for connection until that moment
- When walreceiver connection is closed, we update `next_retry_at` using exponential backoff, increasing the cooldown on every disconnect.
- When `last_record_lsn` was advanced using the WAL from the safekeeper, we reset the retry cooldown and exponential backoff, allowing walreceiver to reconnect to the same safekeeper instantly.
2022-08-18 13:38:23 +03:00
Anastasia Lubennikova
1a07ddae5f fix cargo test 2022-08-18 13:25:00 +03:00
Heikki Linnakangas
9bc12f7444 Move auto-generated 'bindings' to a separate inner module.
Re-export only things that are used by other modules.

In the future, I'm imagining that we run bindgen twice, for Postgres
v14 and v15. The two sets of bindings would go into separate
'bindings_v14' and 'bindings_v15' modules.

Rearrange postgres_ffi modules.

Move function, to avoid Postgres version dependency in timelines.rs
Move function to generate a logical-message WAL record to postgres_ffi.
2022-08-18 13:25:00 +03:00
Rory de Zoete
92bdf04758 Fix: Always build images (#2296)
* Always build images

* Remove unused

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-18 09:41:24 +02:00
Kirill Bulatov
67e091c906 Rework init in pageserver CLI (#2272)
* Do not create initial tenant and timeline (adjust Python tests for that)
* Rework config handling during init, add --update-config to manage local config updates
2022-08-17 23:24:47 +03:00
Alexander Bayandin
dc102197df workflows/benchmarking: increase timeout (#2294) 2022-08-17 17:16:26 +01:00
Rory de Zoete
262cdf8344 Update cachepot endpoint (#2290)
* Update cachepot endpoint

* Update dockerfile & remove env

* Update image building process

* Cannot use metadata endpoint for this

* Update workflow

* Conform to kaniko syntax

* Update syntax

* Update approach

* Update dockerfiles

* Force update

* Update dockerfiles

* Update dockerfile

* Cleanup dockerfiles

* Update s3 test location

* Revert s3 experiment

* Add more debug

* Specify aws region

* Remove debug, add prefix

* Remove one more debug

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-17 18:02:03 +02:00
Kirill Bulatov
3b819ee159 Remove extra type aliases (#2280) 2022-08-17 17:51:53 +03:00
66 changed files with 1386 additions and 807 deletions

View File

@@ -1,7 +1,8 @@
#!/bin/sh
# get instance id from meta-data service
# fetch params from meta-data service
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
AZ_ID=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone)
# store fqdn hostname in var
HOST=$(hostname -f)
@@ -14,7 +15,8 @@ cat <<EOF | tee /tmp/payload
"port": 6500,
"http_port": 7676,
"region_id": {{ console_region_id }},
"instance_id": "${INSTANCE_ID}"
"instance_id": "${INSTANCE_ID}",
"availability_zone_id": "${AZ_ID}"
}
EOF

View File

@@ -106,7 +106,7 @@ jobs:
mkdir -p perf-report-staging
# Set --sparse-ordering option of pytest-order plugin to ensure tests are running in order of appears in the file,
# it's important for test_perf_pgbench.py::test_pgbench_remote_* tests
./scripts/pytest test_runner/performance/ -v -m "remote_cluster" --sparse-ordering --skip-interfering-proc-check --out-dir perf-report-staging --timeout 3600
./scripts/pytest test_runner/performance/ -v -m "remote_cluster" --sparse-ordering --skip-interfering-proc-check --out-dir perf-report-staging --timeout 5400
- name: Submit result
env:

View File

@@ -415,32 +415,9 @@ jobs:
}
}"
dockerfile-check:
if: github.event_name != 'workflow_dispatch'
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:latest
outputs:
value: ${{ steps.dockerfile-check.outputs.any_changed }}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Get specific changed files
id: dockerfile-check
uses: tj-actions/changed-files@802732316a11c01531ea72773ec7998155238e31 # v25
with:
files: |
Dockerfile
Dockerfile.compute-tools
./vendor/postgres/Dockerfile
neon-image:
# force building for all 3 images
if: needs.dockerfile-check.outputs.value != 'true'
runs-on: dev
needs: [ dockerfile-check ]
container: gcr.io/kaniko-project/executor:v1.9.0-debug
environment: dev
steps:
- name: Checkout
@@ -452,15 +429,12 @@ jobs:
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build console
- name: Kaniko build neon
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
compute-tools-image:
if: needs.dockerfile-check.outputs.value != 'true'
runs-on: dev
needs: [ dockerfile-check ]
container: gcr.io/kaniko-project/executor:v1.9.0-debug
environment: dev
steps:
- name: Checkout
@@ -469,15 +443,12 @@ jobs:
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build console
- name: Kaniko build compute tools
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
compute-node-image:
if: needs.dockerfile-check.outputs.value != 'true'
runs-on: dev
needs: [ dockerfile-check ]
container: gcr.io/kaniko-project/executor:v1.9.0-debug
environment: dev
steps:
- name: Checkout
@@ -489,7 +460,7 @@ jobs:
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build console
- name: Kaniko build compute node
working-directory: ./vendor/postgres/
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
@@ -512,7 +483,6 @@ jobs:
runs-on: dev
needs: [ promote-images, tag ]
container: golang:1.19-bullseye
environment: dev
steps:
- name: Install Crane & ECR helper
@@ -597,7 +567,7 @@ jobs:
#container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:latest
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, calculate-deploy-targets, tag ]
needs: [ push-docker-hub, calculate-deploy-targets, tag, other-tests, pg_regress-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
@@ -652,7 +622,7 @@ jobs:
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:latest
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag ]
needs: [ push-docker-hub, calculate-deploy-targets, tag, other-tests, pg_regress-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
/target
/bindings/python/neon-dev-utils/target
/tmp_check
/tmp_install
/tmp_check_cli

23
Cargo.lock generated
View File

@@ -48,9 +48,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.58"
version = "1.0.62"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704"
checksum = "1485d4d2cc45e7b201ee3767015c96faa5904387c9d87c6efdd0fb511f12d305"
dependencies = [
"backtrace",
]
@@ -1409,6 +1409,17 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "integration_tests"
version = "0.1.0"
dependencies = [
"anyhow",
"pg_bin",
"tokio",
"tokio-postgres",
"utils",
]
[[package]]
name = "ipnet"
version = "2.5.0"
@@ -1978,6 +1989,14 @@ dependencies = [
"indexmap",
]
[[package]]
name = "pg_bin"
version = "0.1.0"
dependencies = [
"tokio-postgres",
"utils",
]
[[package]]
name = "phf"
version = "0.10.1"

View File

@@ -7,8 +7,13 @@ members = [
"safekeeper",
"workspace_hack",
"neon_local",
"integration_tests",
"libs/*",
]
exclude = [
"bindings/python/neon-dev-utils",
]
[profile.release]
# This is useful for profiling and, to some extent, debug.

View File

@@ -1,8 +1,6 @@
# Build Postgres
FROM neondatabase/rust:1.58 AS pg-build
WORKDIR /pg
USER root
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned AS pg-build
WORKDIR /home/nonroot
COPY vendor/postgres vendor/postgres
COPY Makefile Makefile
@@ -11,27 +9,30 @@ ENV BUILD_TYPE release
RUN set -e \
&& mold -run make -j $(nproc) -s postgres \
&& rm -rf tmp_install/build \
&& tar -C tmp_install -czf /postgres_install.tar.gz .
&& tar -C tmp_install -czf /home/nonroot/postgres_install.tar.gz .
# Build zenith binaries
FROM neondatabase/rust:1.58 AS build
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build
ARG RUSTC_WRAPPER=cachepot
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
ENV AWS_REGION=eu-central-1
ENV CACHEPOT_S3_KEY_PREFIX=cachepot
ARG CACHEPOT_BUCKET=neon-github-dev
#ARG AWS_ACCESS_KEY_ID
#ARG AWS_SECRET_ACCESS_KEY
COPY --from=pg-build /pg/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY --from=pg-build /home/nonroot/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& sudo -E "PATH=$PATH" mold -run cargo build --release \
&& mold -run cargo build --release \
&& cachepot -s
# Build final image
@@ -40,8 +41,8 @@ FROM debian:bullseye-slim
WORKDIR /data
RUN set -e \
&& apt-get update \
&& apt-get install -y \
&& apt update \
&& apt install -y \
libreadline-dev \
libseccomp-dev \
openssl \
@@ -50,17 +51,14 @@ RUN set -e \
&& useradd -d /data zenith \
&& chown -R zenith:zenith /data
COPY --from=build --chown=zenith:zenith /home/runner/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/proxy /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=pg-build /pg/tmp_install/ /usr/local/
COPY --from=pg-build /postgres_install.tar.gz /data/
COPY docker-entrypoint.sh /docker-entrypoint.sh
COPY --from=pg-build /home/nonroot/tmp_install/ /usr/local/
COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
VOLUME ["/data"]
USER zenith
EXPOSE 6400
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["pageserver"]

View File

@@ -1,22 +1,25 @@
# First transient image to build compute_tools binaries
# NB: keep in sync with rust image version in .github/workflows/build_and_test.yml
FROM neondatabase/rust:1.58 AS rust-build
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned AS rust-build
WORKDIR /home/nonroot
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build.
ARG RUSTC_WRAPPER=cachepot
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
ENV AWS_REGION=eu-central-1
ENV CACHEPOT_S3_KEY_PREFIX=cachepot
ARG CACHEPOT_BUCKET=neon-github-dev
#ARG AWS_ACCESS_KEY_ID
#ARG AWS_SECRET_ACCESS_KEY
COPY . .
RUN set -e \
&& sudo -E "PATH=$PATH" mold -run cargo build -p compute_tools --release \
&& mold -run cargo build -p compute_tools --release \
&& cachepot -s
# Final image that only has one binary
FROM debian:buster-slim
FROM debian:bullseye-slim
COPY --from=rust-build /home/runner/target/release/compute_ctl /usr/local/bin/compute_ctl
COPY --from=rust-build /home/nonroot/target/release/compute_ctl /usr/local/bin/compute_ctl

264
bindings/python/neon-dev-utils/Cargo.lock generated Normal file
View File

@@ -0,0 +1,264 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "indoc"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47741a8bc60fb26eb8d6e0238bbb26d8575ff623fdc97b1a2c00c050b9684ed8"
dependencies = [
"indoc-impl",
"proc-macro-hack",
]
[[package]]
name = "indoc-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce046d161f000fffde5f432a0d034d0341dc152643b2598ed5bfce44c4f3a8f0"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
"unindent",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "libc"
version = "0.2.132"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
[[package]]
name = "lock_api"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f80bf5aacaf25cbfc8210d1cfb718f2bf3b11c4c54e5afe36c236853a8ec390"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "neon-dev-utils"
version = "0.1.0"
dependencies = [
"pyo3",
]
[[package]]
name = "once_cell"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]]
name = "paste"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45ca20c77d80be666aef2b45486da86238fabe33e38306bd3118fe4af33fa880"
dependencies = [
"paste-impl",
"proc-macro-hack",
]
[[package]]
name = "paste-impl"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d95a7db200b97ef370c8e6de0088252f7e0dfff7d047a28528e47456c0fc98b6"
dependencies = [
"proc-macro-hack",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
dependencies = [
"unicode-ident",
]
[[package]]
name = "pyo3"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d41d50a7271e08c7c8a54cd24af5d62f73ee3a6f6a314215281ebdec421d5752"
dependencies = [
"cfg-if",
"indoc",
"libc",
"parking_lot",
"paste",
"pyo3-build-config",
"pyo3-macros",
"unindent",
]
[[package]]
name = "pyo3-build-config"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "779239fc40b8e18bc8416d3a37d280ca9b9fb04bda54b98037bb6748595c2410"
dependencies = [
"once_cell",
]
[[package]]
name = "pyo3-macros"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b247e8c664be87998d8628e86f282c25066165f1f8dda66100c48202fdb93a"
dependencies = [
"pyo3-macros-backend",
"quote",
"syn",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a8c2812c412e00e641d99eeb79dd478317d981d938aa60325dfa7157b607095"
dependencies = [
"proc-macro2",
"pyo3-build-config",
"quote",
"syn",
]
[[package]]
name = "quote"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "smallvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
[[package]]
name = "syn"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
[[package]]
name = "unindent"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ee9362deb4a96cef4d437d1ad49cffc9b9e92d202b6995674e928ce684f112"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

View File

@@ -0,0 +1,16 @@
[package]
name = "neon-dev-utils"
version = "0.1.0"
edition = "2021"
[lib]
name = "neon_dev_utils"
# "cdylib" is necessary to produce a shared library for Python to import from.
#
# Downstream Rust code (including code in `bin/`, `examples/`, and `tests/`) will not be able
# to `use string_sum;` unless the "rlib" or "lib" crate type is also included, e.g.:
# crate-type = ["cdylib", "rlib"]
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.15.1", features = ["extension-module"] }

View File

@@ -0,0 +1,31 @@
[[package]]
name = "maturin"
version = "0.13.2"
description = "Build and publish crates with pyo3, rust-cpython and cffi bindings as well as rust binaries as python packages"
category = "dev"
optional = false
python-versions = ">=3.7"
[package.dependencies]
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
[package.extras]
zig = ["ziglang (>=0.9.0,<0.10.0)"]
patchelf = ["patchelf"]
[[package]]
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
category = "dev"
optional = false
python-versions = ">=3.7"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "4e177514d6cf74b58bcd8ca30ef300c10a833b3e6b1d809aa57337ee20efeb47"
[metadata.files]
maturin = []
tomli = []

View File

@@ -0,0 +1,15 @@
[tool.poetry]
name = "neon-dev-utils"
version = "0.1.0"
description = "Python bindings for common neon development utils"
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.10"
[tool.poetry.dev-dependencies]
maturin = "^0.13.2"
[build-system]
requires = ["maturin>=0.13.2", "poetry-core>=1.0.0"]
build-backend = "maturin"

View File

@@ -0,0 +1,17 @@
use pyo3::prelude::*;
/// Formats the sum of two numbers as string.
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
Ok((a + b).to_string())
}
/// A Python module implemented in Rust. The name of this function must match
/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to
/// import the module.
#[pymodule]
fn neon_dev_utils(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
Ok(())
}

View File

@@ -24,7 +24,7 @@ use crate::safekeeper::SafekeeperNode;
// This data structures represents neon_local CLI config
//
// It is deserialized from the .neon/config file, or the config file passed
// to 'zenith init --config=<path>' option. See control_plane/simple.conf for
// to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
// an example.
//
#[serde_as]
@@ -320,7 +320,7 @@ impl LocalEnv {
if !repopath.exists() {
bail!(
"Zenith config is not found in {}. You need to run 'zenith init' first",
"Zenith config is not found in {}. You need to run 'neon_local init' first",
repopath.to_str().unwrap()
);
}
@@ -337,12 +337,12 @@ impl LocalEnv {
}
pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> {
// Currently, the user first passes a config file with 'zenith init --config=<path>'
// Currently, the user first passes a config file with 'neon_local init --config=<path>'
// We read that in, in `create_config`, and fill any missing defaults. Then it's saved
// to .neon/config. TODO: We lose any formatting and comments along the way, which is
// a bit sad.
let mut conf_content = r#"# This file describes a locale deployment of the page server
# and safekeeeper node. It is read by the 'zenith' command-line
# and safekeeeper node. It is read by the 'neon_local' command-line
# utility.
"#
.to_string();
@@ -382,7 +382,7 @@ impl LocalEnv {
}
//
// Initialize a new Zenith repository
// Initialize a new Neon repository
//
pub fn init(&mut self) -> anyhow::Result<()> {
// check if config already exists

View File

@@ -51,7 +51,7 @@ impl ResponseErrorMessageExt for Response {
Err(SafekeeperHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {url}.", status.as_u16()),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Write};
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::Duration;
use std::{io, result, thread};
@@ -102,23 +102,19 @@ impl PageServerNode {
/// Construct libpq connection string for connecting to the pageserver.
fn pageserver_connection_config(password: &str, listen_addr: &str) -> Config {
format!("postgresql://no_user:{}@{}/no_db", password, listen_addr)
format!("postgresql://no_user:{password}@{listen_addr}/no_db")
.parse()
.unwrap()
}
pub fn init(
pub fn initialize(
&self,
create_tenant: Option<ZTenantId>,
initial_timeline_id: Option<ZTimelineId>,
config_overrides: &[&str],
) -> anyhow::Result<ZTimelineId> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let base_data_dir_param = self.env.base_data_dir.display().to_string();
let pg_distrib_dir_param =
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
@@ -138,67 +134,52 @@ impl PageServerNode {
.collect::<Vec<_>>()
.join(",")
);
let mut args = Vec::with_capacity(20);
args.push("--init");
args.extend(["-D", &base_data_dir_param]);
args.extend(["-c", &pg_distrib_dir_param]);
args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]);
args.extend(["-c", &broker_endpoints_param]);
args.extend(["-c", &id]);
let broker_etcd_prefix_param = self
.env
.etcd_broker
.broker_etcd_prefix
.as_ref()
.map(|prefix| format!("broker_etcd_prefix='{prefix}'"));
if let Some(broker_etcd_prefix_param) = broker_etcd_prefix_param.as_deref() {
args.extend(["-c", broker_etcd_prefix_param]);
}
for config_override in config_overrides {
args.extend(["-c", config_override]);
let mut init_config_overrides = config_overrides.to_vec();
init_config_overrides.push(&id);
init_config_overrides.push(&pg_distrib_dir_param);
init_config_overrides.push(&authg_type_param);
init_config_overrides.push(&listen_http_addr_param);
init_config_overrides.push(&listen_pg_addr_param);
init_config_overrides.push(&broker_endpoints_param);
if let Some(broker_etcd_prefix_param) = broker_etcd_prefix_param.as_deref() {
init_config_overrides.push(broker_etcd_prefix_param);
}
if self.env.pageserver.auth_type != AuthType::Trust {
args.extend([
"-c",
"auth_validation_public_key_path='auth_public_key.pem'",
]);
init_config_overrides.push("auth_validation_public_key_path='auth_public_key.pem'");
}
let create_tenant = create_tenant.map(|id| id.to_string());
if let Some(tenant_id) = create_tenant.as_deref() {
args.extend(["--create-tenant", tenant_id])
self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?;
let init_result = self
.try_init_timeline(create_tenant, initial_timeline_id)
.context("Failed to create initial tenant and timeline for pageserver");
match &init_result {
Ok(initial_timeline_id) => {
println!("Successfully initialized timeline {initial_timeline_id}")
}
Err(e) => eprintln!("{e:#}"),
}
self.stop(false)?;
init_result
}
let initial_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
let initial_timeline_id_string = initial_timeline_id.to_string();
args.extend(["--initial-timeline-id", &initial_timeline_id_string]);
let cmd_with_args = cmd.args(args);
let init_output = fill_rust_env_vars(cmd_with_args)
.output()
.with_context(|| {
format!("failed to init pageserver with command {:?}", cmd_with_args)
})?;
if !init_output.status.success() {
bail!(
"init invocation failed, {}\nStdout: {}\nStderr: {}",
init_output.status,
String::from_utf8_lossy(&init_output.stdout),
String::from_utf8_lossy(&init_output.stderr)
);
}
// echo the captured output of the init command
println!("{}", String::from_utf8_lossy(&init_output.stdout));
Ok(initial_timeline_id)
fn try_init_timeline(
&self,
new_tenant_id: Option<ZTenantId>,
new_timeline_id: Option<ZTimelineId>,
) -> anyhow::Result<ZTimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info =
self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?;
Ok(initial_timeline_info.timeline_id)
}
pub fn repo_path(&self) -> PathBuf {
@@ -210,15 +191,35 @@ impl PageServerNode {
}
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
print!(
self.start_node(config_overrides, &self.repo_path(), false)
}
fn start_node(
&self,
config_overrides: &[&str],
datadir: &Path,
update_config: bool,
) -> anyhow::Result<()> {
println!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
self.repo_path().display()
datadir.display()
);
io::stdout().flush().unwrap();
io::stdout().flush()?;
let repo_path = self.repo_path();
let mut args = vec!["-D", repo_path.to_str().unwrap()];
let mut args = vec![
"-D",
datadir.to_str().with_context(|| {
format!(
"Datadir path '{}' cannot be represented as a unicode string",
datadir.display()
)
})?,
];
if update_config {
args.push("--update-config");
}
for config_override in config_overrides {
args.extend(["-c", config_override]);
@@ -230,8 +231,8 @@ impl PageServerNode {
if !filled_cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",
self.repo_path().join("pageserver.log").display()
"Pageserver failed to start. See console output and '{}' for details.",
datadir.join("pageserver.log").display()
);
}
@@ -240,7 +241,7 @@ impl PageServerNode {
const RETRIES: i8 = 15;
for retries in 1..RETRIES {
match self.check_status() {
Ok(_) => {
Ok(()) => {
println!("\nPageserver started");
return Ok(());
}
@@ -254,21 +255,18 @@ impl PageServerNode {
if retries == 5 {
println!() // put a line break after dots for second message
}
println!(
"Pageserver not responding yet, err {} retrying ({})...",
err, retries
);
println!("Pageserver not responding yet, err {err} retrying ({retries})...");
}
}
PageserverHttpError::Response(msg) => {
bail!("pageserver failed to start: {} ", msg)
bail!("pageserver failed to start: {msg} ")
}
}
thread::sleep(Duration::from_secs(1));
}
}
}
bail!("pageserver failed to start in {} seconds", RETRIES);
bail!("pageserver failed to start in {RETRIES} seconds");
}
///
@@ -298,15 +296,11 @@ impl PageServerNode {
match kill(pid, sig) {
Ok(_) => (),
Err(Errno::ESRCH) => {
println!(
"Pageserver with pid {} does not exist, but a PID file was found",
pid
);
println!("Pageserver with pid {pid} does not exist, but a PID file was found");
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to pageserver with pid {}: {}",
pid,
"Failed to send signal to pageserver with pid {pid}: {}",
err.desc()
),
}
@@ -335,13 +329,13 @@ impl PageServerNode {
thread::sleep(Duration::from_millis(100));
}
bail!("Failed to stop pageserver with pid {}", pid);
bail!("Failed to stop pageserver with pid {pid}");
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
println!("Pageserver query: '{}'", sql);
println!("Pageserver query: '{sql}'");
client.simple_query(sql).unwrap()
}
@@ -376,9 +370,8 @@ impl PageServerNode {
&self,
new_tenant_id: Option<ZTenantId>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<Option<ZTenantId>> {
let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
) -> anyhow::Result<ZTenantId> {
self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest {
new_tenant_id,
checkpoint_distance: settings
@@ -417,18 +410,16 @@ impl PageServerNode {
})
.send()?
.error_from_body()?
.json::<Option<String>>()?;
tenant_id_string
.map(|id| {
id.parse().with_context(|| {
format!(
"Failed to parse tennat creation response as tenant id: {}",
id
)
.json::<Option<String>>()
.with_context(|| {
format!("Failed to parse tenant creation response for tenant id: {new_tenant_id:?}")
})?
.context("No tenant id was found in the tenant creation response")
.and_then(|tenant_id_string| {
tenant_id_string.parse().with_context(|| {
format!("Failed to parse response string as tenant id: '{tenant_id_string}'")
})
})
.transpose()
}
pub fn tenant_config(&self, tenant_id: ZTenantId, settings: HashMap<&str, &str>) -> Result<()> {
@@ -499,22 +490,27 @@ impl PageServerNode {
new_timeline_id: Option<ZTimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<ZTimelineId>,
) -> anyhow::Result<Option<TimelineInfo>> {
let timeline_info_response = self
.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
) -> anyhow::Result<TimelineInfo> {
self.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)
.json(&TimelineCreateRequest {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
})
.send()?
.error_from_body()?
.json::<Option<TimelineInfo>>()
.with_context(|| {
format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
})?
.with_context(|| {
format!(
"No timeline id was found in the timeline creation response for tenant {tenant_id}"
)
.json(&TimelineCreateRequest {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
})
.send()?
.error_from_body()?
.json::<Option<TimelineInfo>>()?;
Ok(timeline_info_response)
})
}
/// Import a basebackup prepared using either:

View File

@@ -1,24 +0,0 @@
#!/bin/sh
set -eux
pageserver_id_param="${NODE_ID:-10}"
broker_endpoints_param="${BROKER_ENDPOINT:-absent}"
if [ "$broker_endpoints_param" != "absent" ]; then
broker_endpoints_param="-c broker_endpoints=['$broker_endpoints_param']"
else
broker_endpoints_param=''
fi
remote_storage_param="${REMOTE_STORAGE:-}"
if [ "$1" = 'pageserver' ]; then
if [ ! -d "/data/tenants" ]; then
echo "Initializing pageserver data directory"
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=${pageserver_id_param}" $broker_endpoints_param $remote_storage_param
fi
echo "Staring pageserver at 0.0.0.0:6400"
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" $broker_endpoints_param -D /data
else
"$@"
fi

View File

@@ -0,0 +1,13 @@
[package]
name = "integration_tests"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
utils = { path = "../libs/utils" }
pg_bin = { path = "../libs/pg_bin" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
anyhow = "1.0.62"

View File

@@ -0,0 +1,36 @@
#[cfg(test)]
mod tests {
use pg_bin::PgDatadir;
use std::path::PathBuf;
use tokio_postgres::NoTls;
#[tokio::test]
async fn test_postgres_select_1() -> anyhow::Result<()> {
// Test setup
let output = PathBuf::from("/home/bojan/tmp/");
let pg_prefix = PathBuf::from("/home/bojan/src/neondatabase/neon/tmp_install/bin/");
// Init datadir
let pg_datadir_path = PathBuf::from("/home/bojan/tmp/t1/");
let pg_datadir = PgDatadir::new_initdb(pg_datadir_path, &pg_prefix, &output, true);
// Get a postgres
let postgres = pg_datadir.spawn_postgres(pg_prefix, output);
let conn_info = postgres.admin_conn_info();
// Get client, run connection
let (client, connection) = conn_info.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
// Run "select 1"
let rows = client.query("SELECT 'hello';", &[]).await?;
let value: &str = rows[0].get(0);
assert_eq!(value, "hello");
Ok(())
}
}

View File

@@ -0,0 +1 @@
mod basic;

10
libs/pg_bin/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "pg_bin"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
utils = { path = "../utils" }

106
libs/pg_bin/src/lib.rs Normal file
View File

@@ -0,0 +1,106 @@
//! Utils for runnig postgres binaries as subprocesses.
use std::{fs::{File, remove_dir_all}, path::PathBuf, process::{Child, Command}, time::Duration};
use std::io::Write;
use utils::command_extensions::NeonCommandExtensions;
pub struct PgDatadir {
path: PathBuf
}
impl PgDatadir {
pub fn new_initdb(
path: PathBuf,
pg_prefix: &PathBuf,
command_output_dir: &PathBuf,
remove_if_exists: bool
) -> Self {
if remove_if_exists {
remove_dir_all(path.clone()).ok();
}
let status = Command::new(pg_prefix.join("initdb"))
.arg("-D")
.arg(path.clone())
.capture_to_files(command_output_dir.clone(), "initdb")
.status()
.expect("failed to get status");
assert!(status.success());
Self {
path
}
}
pub fn load_existing(path: PathBuf) -> Self{
Self {
path
}
}
pub fn path(&self) -> PathBuf {
self.path.clone()
}
pub fn spawn_postgres(self, pg_prefix: PathBuf, command_output_dir: PathBuf) -> LocalPostgres {
let port = 54729;
// Write conf
// TODO don't override existing conf
// - instead infer port from conf
let mut conf = File::create(self.path().join("postgresql.conf")).expect("failed to create file");
writeln!(&mut conf, "port = {}", port).expect("failed to write conf");
let process = Command::new(pg_prefix.join("postgres"))
.env("PGDATA", self.path())
.capture_to_files(command_output_dir, "pg")
.spawn()
.expect("postgres failed to spawn");
// Wait until ready. TODO improve this
std::thread::sleep(Duration::from_millis(300));
LocalPostgres {
datadir: self,
port: 54729,
process,
}
}
}
pub struct LocalPostgres {
datadir: PgDatadir,
port: u16,
process: Child,
}
impl LocalPostgres {
pub fn admin_conn_info(&self) -> tokio_postgres::Config {
// I don't like this, but idk what else to do
let whoami = Command::new("whoami").output().unwrap().stdout;
let user = String::from_utf8_lossy(&whoami);
let user = user.trim();
let mut config = tokio_postgres::Config::new();
config
.host("127.0.0.1")
.port(self.port)
.dbname("postgres")
.user(&user);
config
}
pub fn stop(mut self) -> PgDatadir {
self.process.kill().expect("failed to kill child");
PgDatadir {
path: self.datadir.path.clone()
}
}
}
impl Drop for LocalPostgres {
fn drop(&mut self) {
self.process.kill().expect("failed to kill child");
}
}

View File

@@ -23,7 +23,7 @@
//! information. You can use PostgreSQL's pg_controldata utility to view its
//! contents.
//!
use crate::{ControlFileData, PG_CONTROL_FILE_SIZE};
use super::bindings::{ControlFileData, PG_CONTROL_FILE_SIZE};
use anyhow::{bail, Result};
use bytes::{Bytes, BytesMut};

View File

@@ -7,21 +7,62 @@
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
macro_rules! postgres_ffi {
($version:ident) => {
#[path = "."]
pub mod $version {
// fixme: does this have to be 'pub'?
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod waldecoder;
pub mod xlog_utils;
use serde::{Deserialize, Serialize};
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
}
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod waldecoder;
pub mod xlog_utils;
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
}
};
}
postgres_ffi!(v14);
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
pub use v14::bindings::{MultiXactId, TransactionId};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
pub const XLOG_BLCKSZ: usize = 8192;
// PG timeline is always 1, changing it doesn't have any useful meaning in Neon.
//
// NOTE: this is not to be confused with Neon timelines; different concept!
//
// It's a shaky assumption, that it's always 1. We might import a
// PostgreSQL data directory that has gone through timeline bumps,
// for example. FIXME later.
pub const PG_TLI: u32 = 1;
// See TransactionIdIsNormal in transam.h
pub const fn transaction_id_is_normal(id: TransactionId) -> bool {
id > pg_constants::FIRST_NORMAL_TRANSACTION_ID
id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID
}
// See TransactionIdPrecedes in transam.c

View File

@@ -1,11 +1,12 @@
//!
//! Common utilities for dealing with PostgreSQL non-relation files.
//!
use crate::{pg_constants, transaction_id_precedes};
use crate::transaction_id_precedes;
use super::pg_constants;
use bytes::BytesMut;
use log::*;
use crate::MultiXactId;
use super::bindings::MultiXactId;
pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) {
trace!(

View File

@@ -7,7 +7,8 @@
//! comments on them.
//!
use crate::PageHeaderData;
use super::bindings::PageHeaderData;
use crate::BLCKSZ;
//
// From pg_tablespace_d.h
@@ -31,11 +32,6 @@ pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
pub const SMGR_TRUNCATE_VM: u32 = 0x0002;
pub const SMGR_TRUNCATE_FSM: u32 = 0x0004;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// From bufpage.h
//
@@ -213,7 +209,6 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* FIXME: pageserver should request wal_seg_size from compute node */
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const XLOG_BLCKSZ: usize = 8192;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_LONG_HEADER: u16 = 0x0002;

View File

@@ -1,7 +1,7 @@
//!
//! Common utilities for dealing with PostgreSQL relation files.
//!
use crate::pg_constants;
use super::pg_constants;
use once_cell::sync::OnceCell;
use regex::Regex;

View File

@@ -10,10 +10,7 @@
//!
use super::pg_constants;
use super::xlog_utils::*;
use super::XLogLongPageHeaderData;
use super::XLogPageHeaderData;
use super::XLogRecord;
use super::XLOG_PAGE_MAGIC;
use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crc32c::*;
use log::*;

View File

@@ -7,22 +7,24 @@
// have been named the same as the corresponding PostgreSQL functions instead.
//
use crate::pg_constants;
use crate::CheckPoint;
use crate::FullTransactionId;
use crate::XLogLongPageHeaderData;
use crate::XLogPageHeaderData;
use crate::XLogRecord;
use crate::XLOG_PAGE_MAGIC;
use crc32c::crc32c_append;
use crate::pg_constants::WAL_SEGMENT_SIZE;
use crate::waldecoder::WalStreamDecoder;
use super::bindings::{
CheckPoint, FullTransactionId, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord,
XLOG_PAGE_MAGIC,
};
use super::pg_constants;
use super::pg_constants::WAL_SEGMENT_SIZE;
use crate::v14::waldecoder::WalStreamDecoder;
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use bytes::BytesMut;
use bytes::{Buf, Bytes};
use log::*;
use serde::Serialize;
use std::fs::File;
use std::io::prelude::*;
use std::io::ErrorKind;
@@ -47,9 +49,6 @@ pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
#[allow(clippy::identity_op)]
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
// PG timeline is always 1, changing it doesn't have useful meaning in Zenith.
pub const PG_TLI: u32 = 1;
pub type XLogRecPtr = u64;
pub type TimeLineID = u32;
pub type TimestampTz = i64;
@@ -346,6 +345,85 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, Seriali
Ok(seg_buf.freeze())
}
#[repr(C)]
#[derive(Serialize)]
struct XlLogicalMessage {
db_id: Oid,
transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
prefix_size: uint64,
message_size: uint64,
}
impl XlLogicalMessage {
pub fn encode(&self) -> Bytes {
use utils::bin_ser::LeSer;
self.ser().unwrap().into()
}
}
/// Create new WAL record for non-transactional logical message.
/// Used for creating artificial WAL for tests, as LogicalMessage
/// record is basically no-op.
///
/// NOTE: This leaves the xl_prev field zero. The safekeeper and
/// pageserver tolerate that, but PostgreSQL does not.
pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
prefix_bytes.write_all(prefix.as_bytes()).unwrap();
prefix_bytes.push(0);
let message_bytes = message.as_bytes();
let logical_message = XlLogicalMessage {
db_id: 0,
transactional: 0,
prefix_size: prefix_bytes.len() as u64,
message_size: message_bytes.len() as u64,
};
let mainrdata = logical_message.encode();
let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
// only short mainrdata is supported for now
assert!(mainrdata_len <= 255);
let mainrdata_len = mainrdata_len as u8;
let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
data.extend_from_slice(&mainrdata);
data.extend_from_slice(&prefix_bytes);
data.extend_from_slice(message_bytes);
let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
let mut header = XLogRecord {
xl_tot_len: total_len as u32,
xl_xid: 0,
xl_prev: 0,
xl_info: 0,
xl_rmid: 21,
__bindgen_padding_0: [0u8; 2usize],
xl_crc: 0, // crc will be calculated later
};
let header_bytes = header.encode().expect("failed to encode header");
let crc = crc32c_append(0, &data);
let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
header.xl_crc = crc;
let mut wal: Vec<u8> = Vec::new();
wal.extend_from_slice(&header.encode().expect("failed to encode header"));
wal.extend_from_slice(&data);
// WAL start position must be aligned at 8 bytes,
// this will add padding for the next WAL record.
const PADDING: usize = 8;
let padding_rem = wal.len() % PADDING;
if padding_rem != 0 {
wal.resize(wal.len() + PADDING - padding_rem, 0);
}
wal
}
#[cfg(test)]
mod tests {
use super::*;
@@ -547,4 +625,15 @@ mod tests {
checkpoint.update_next_xid(1024);
assert_eq!(checkpoint.nextXid.value, 2048);
}
#[test]
pub fn test_encode_logical_message() {
let expected = [
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255,
38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114,
101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
];
let actual = encode_logical_message("prefix", "message");
assert_eq!(expected, actual[..]);
}
}

View File

@@ -4,8 +4,8 @@ use log::*;
use once_cell::sync::Lazy;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::pg_constants::WAL_SEGMENT_SIZE;
use postgres_ffi::xlog_utils::{
use postgres_ffi::v14::pg_constants::WAL_SEGMENT_SIZE;
use postgres_ffi::v14::xlog_utils::{
XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::cmp::Ordering;

View File

@@ -0,0 +1,21 @@
use std::path::PathBuf;
use std::{os::unix::prelude::CommandExt, process::Command};
use std::fs::File;
pub trait NeonCommandExtensions: CommandExt {
fn capture_to_files(&mut self, path: PathBuf, name: &str) -> &mut Command;
}
impl NeonCommandExtensions for Command {
fn capture_to_files(&mut self, path: PathBuf, name: &str) -> &mut Command {
let out_file = File::create(path.join(format!("{}.out", name)))
.expect("can't make file");
let err_file = File::create(path.join(format!("{}.out", name)))
.expect("can't make file");
// TODO touch files?
self.stdout(out_file).stderr(err_file)
}
}

View File

@@ -54,6 +54,9 @@ pub mod nonblock;
// Default signal handling
pub mod signals;
// Helpers for running commands
pub mod command_extensions;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -501,10 +501,10 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
// default_tenantid was generated by the `env.init()` call above
let initial_tenant_id = env.default_tenant_id.unwrap();
// Call 'pageserver init'.
// Initialize pageserver, create initial tenant and timeline.
let pageserver = PageServerNode::from_env(&env);
let initial_timeline_id = pageserver
.init(
.initialize(
Some(initial_tenant_id),
initial_timeline_id_arg,
&pageserver_config_overrides(init_match),
@@ -551,25 +551,15 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.values_of("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let new_tenant_id = pageserver
.tenant_create(initial_tenant_id, tenant_conf)?
.ok_or_else(|| {
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
})?;
println!(
"tenant {} successfully created on the pageserver",
new_tenant_id
);
let new_tenant_id = pageserver.tenant_create(initial_tenant_id, tenant_conf)?;
println!("tenant {new_tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
let timeline = pageserver
.timeline_create(new_tenant_id, new_timeline_id, None, None)?
.context(format!(
"Failed to create initial timeline for tenant {new_tenant_id}"
))?;
let new_timeline_id = timeline.timeline_id;
let last_record_lsn = timeline
let timeline_info =
pageserver.timeline_create(new_tenant_id, new_timeline_id, None, None)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info
.local
.context(format!("Failed to get last record LSN: no local timeline info for timeline {new_timeline_id}"))?
.last_record_lsn;
@@ -616,20 +606,18 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_branch_name = create_match
.value_of("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
let timeline = pageserver
.timeline_create(tenant_id, None, None, None)?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id;
let timeline_info = pageserver.timeline_create(tenant_id, None, None, None)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline
let last_record_lsn = timeline_info
.local
.expect("no local timeline info")
.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {} for tenant: {}",
timeline.timeline_id, last_record_lsn, tenant_id,
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
timeline_info.timeline_id
);
}
Some(("import", import_match)) => {
@@ -680,10 +668,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let ancestor_timeline_id = env
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
.ok_or_else(|| {
anyhow!(
"Found no timeline id for branch name '{}'",
ancestor_branch_name
)
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
})?;
let start_lsn = branch_match
@@ -691,12 +676,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.map(Lsn::from_str)
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let timeline = pageserver
.timeline_create(tenant_id, None, start_lsn, Some(ancestor_timeline_id))?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id;
let timeline_info = pageserver.timeline_create(
tenant_id,
None,
start_lsn,
Some(ancestor_timeline_id),
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline
let last_record_lsn = timeline_info
.local
.expect("no local timeline info")
.last_record_lsn;
@@ -704,11 +692,11 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {} for tenant: {}. Ancestor timeline: '{}'",
timeline.timeline_id, last_record_lsn, tenant_id, ancestor_branch_name,
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
timeline_info.timeline_id
);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
None => bail!("no tenant subcommand provided"),
}

View File

@@ -24,8 +24,13 @@ use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::DatadirTimeline;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName};
use postgres_ffi::v14::{CheckPoint, ControlFileData};
use postgres_ffi::TransactionId;
use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE};
use utils::lsn::Lsn;
/// This is short-living object only for the time of tarball creation,
@@ -200,7 +205,7 @@ where
}
// Add a file for each chunk of blocks (aka segment)
let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize);
let chunks = (0..nblocks).chunks(RELSEG_SIZE as usize);
for (seg, blocks) in chunks.into_iter().enumerate() {
let mut segment_data: Vec<u8> = vec![];
for blknum in blocks {
@@ -220,23 +225,19 @@ where
fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
let nblocks = self.timeline.get_slru_segment_size(slru, segno, self.lsn)?;
let mut slru_buf: Vec<u8> =
Vec::with_capacity(nblocks as usize * pg_constants::BLCKSZ as usize);
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
for blknum in 0..nblocks {
let img = self
.timeline
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)?;
if slru == SlruKind::Clog {
ensure!(
img.len() == pg_constants::BLCKSZ as usize
|| img.len() == pg_constants::BLCKSZ as usize + 8
);
ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
} else {
ensure!(img.len() == pg_constants::BLCKSZ as usize);
ensure!(img.len() == BLCKSZ as usize);
}
slru_buf.extend_from_slice(&img[..pg_constants::BLCKSZ as usize]);
slru_buf.extend_from_slice(&img[..BLCKSZ as usize]);
}
let segname = format!("{}/{:>04X}", slru.to_str(), segno);

View File

@@ -1,6 +1,6 @@
//! Main entry point for the Page Server executable.
use std::{env, path::Path, str::FromStr};
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use tracing::*;
use anyhow::{bail, Context, Result};
@@ -13,7 +13,7 @@ use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
thread_mgr::ThreadKind,
timelines, virtual_file, LOG_FILE_NAME,
virtual_file, LOG_FILE_NAME,
};
use utils::{
auth::JwtAuth,
@@ -24,7 +24,6 @@ use utils::{
shutdown::exit_now,
signals::{self, Signal},
tcp_listener,
zid::{ZTenantId, ZTimelineId},
};
project_git_version!(GIT_VERSION);
@@ -42,6 +41,7 @@ fn main() -> anyhow::Result<()> {
.about("Materializes WAL stream to pages and serves them to the postgres")
.version(&*version())
.arg(
Arg::new("daemonize")
.short('d')
.long("daemonize")
@@ -52,7 +52,7 @@ fn main() -> anyhow::Result<()> {
Arg::new("init")
.long("init")
.takes_value(false)
.help("Initialize pageserver service: creates an initial config, tenant and timeline, if specified"),
.help("Initialize pageserver with all given config overrides"),
)
.arg(
Arg::new("workdir")
@@ -61,20 +61,6 @@ fn main() -> anyhow::Result<()> {
.takes_value(true)
.help("Working directory for the pageserver"),
)
.arg(
Arg::new("create-tenant")
.long("create-tenant")
.takes_value(true)
.help("Create tenant during init")
.requires("init"),
)
.arg(
Arg::new("initial-timeline-id")
.long("initial-timeline-id")
.takes_value(true)
.help("Use a specific timeline id during init and tenant creation")
.requires("create-tenant"),
)
// See `settings.md` for more details on the extra configuration patameters pageserver can process
.arg(
Arg::new("config-override")
@@ -85,6 +71,9 @@ fn main() -> anyhow::Result<()> {
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
)
.arg(Arg::new("update-config").long("update-config").takes_value(false).help(
"Update the config file when started",
))
.arg(
Arg::new("enabled-features")
.long("enabled-features")
@@ -110,18 +99,6 @@ fn main() -> anyhow::Result<()> {
.with_context(|| format!("Error opening workdir '{}'", workdir.display()))?;
let cfg_file_path = workdir.join("pageserver.toml");
let init = arg_matches.is_present("init");
let create_tenant = arg_matches
.value_of("create-tenant")
.map(ZTenantId::from_str)
.transpose()
.context("Failed to parse tenant id from the arguments")?;
let initial_timeline_id = arg_matches
.value_of("initial-timeline-id")
.map(ZTimelineId::from_str)
.transpose()
.context("Failed to parse timeline id from the arguments")?;
// Set CWD to workdir for non-daemon modes
env::set_current_dir(&workdir).with_context(|| {
format!(
@@ -131,30 +108,86 @@ fn main() -> anyhow::Result<()> {
})?;
let daemonize = arg_matches.is_present("daemonize");
if init && daemonize {
bail!("--daemonize cannot be used with --init")
}
let mut toml = if init {
// We're initializing the repo, so there's no config file yet
DEFAULT_CONFIG_FILE
.parse::<toml_edit::Document>()
.context("could not parse built-in config file")?
} else {
// Supplement the CLI arguments with the config file
let cfg_file_contents = std::fs::read_to_string(&cfg_file_path)
.with_context(|| format!("No pageserver config at '{}'", cfg_file_path.display()))?;
cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| {
format!(
"Failed to read '{}' as pageserver config",
cfg_file_path.display()
)
})?
let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? {
ControlFlow::Continue(conf) => conf,
ControlFlow::Break(()) => {
info!("Pageserver config init successful");
return Ok(());
}
};
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
utils::crashsafe_dir::create_dir_all(conf.tenants_path()).with_context(|| {
format!(
"Failed to create tenants root dir at '{}'",
tenants_path.display()
)
})?;
}
// Initialize up failpoints support
let scenario = FailScenario::setup();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
}
fn initialize_config(
cfg_file_path: &Path,
arg_matches: clap::ArgMatches,
workdir: &Path,
) -> anyhow::Result<ControlFlow<(), &'static PageServerConf>> {
let init = arg_matches.is_present("init");
let update_config = init || arg_matches.is_present("update-config");
let (mut toml, config_file_exists) = if cfg_file_path.is_file() {
if init {
anyhow::bail!(
"Config file '{}' already exists, cannot init it, use --update-config to update it",
cfg_file_path.display()
);
}
// Supplement the CLI arguments with the config file
let cfg_file_contents = std::fs::read_to_string(&cfg_file_path).with_context(|| {
format!(
"Failed to read pageserver config at '{}'",
cfg_file_path.display()
)
})?;
(
cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| {
format!(
"Failed to parse '{}' as pageserver config",
cfg_file_path.display()
)
})?,
true,
)
} else if cfg_file_path.exists() {
anyhow::bail!(
"Config file '{}' exists but is not a regular file",
cfg_file_path.display()
);
} else {
// We're initializing the repo, so there's no config file yet
(
DEFAULT_CONFIG_FILE
.parse::<toml_edit::Document>()
.context("could not parse built-in config file")?,
false,
)
};
// Process any extra options given with -c
if let Some(values) = arg_matches.values_of("config-override") {
for option_line in values {
let doc = toml_edit::Document::from_str(option_line).with_context(|| {
@@ -165,49 +198,38 @@ fn main() -> anyhow::Result<()> {
})?;
for (key, item) in doc.iter() {
if key == "id" {
anyhow::ensure!(
init,
"node id can only be set during pageserver init and cannot be overridden"
);
if config_file_exists && update_config && key == "id" && toml.contains_key(key) {
anyhow::bail!("Pageserver config file exists at '{}' and has node id already, it cannot be overridden", cfg_file_path.display());
}
toml.insert(key, item.clone());
}
}
}
trace!("Resulting toml: {}", toml);
let conf = PageServerConf::parse_and_validate(&toml, &workdir)
debug!("Resulting toml: {toml}");
let conf = PageServerConf::parse_and_validate(&toml, workdir)
.context("Failed to parse pageserver configuration")?;
// The configuration is all set up now. Turn it into a 'static
// that can be freely stored in structs and passed across threads
// as a ref.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
if update_config {
info!("Writing pageserver config to '{}'", cfg_file_path.display());
// Initialize up failpoints support
let scenario = FailScenario::setup();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
// Create repo and exit if init was requested
if init {
timelines::init_pageserver(conf, create_tenant, initial_timeline_id)
.context("Failed to init pageserver")?;
// write the config file
std::fs::write(&cfg_file_path, toml.to_string()).with_context(|| {
format!(
"Failed to initialize pageserver config at '{}'",
"Failed to write pageserver config to '{}'",
cfg_file_path.display()
)
})?;
} else {
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
info!(
"Config successfully written to '{}'",
cfg_file_path.display()
)
}
scenario.teardown();
Ok(())
Ok(if init {
ControlFlow::Break(())
} else {
ControlFlow::Continue(Box::leak(Box::new(conf)))
})
}
fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> {

View File

@@ -11,14 +11,13 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::layered_repository::metadata::TimelineMetadata;
use crate::layered_repository::{metadata::TimelineMetadata, LayeredTimeline};
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::{LocalTimelineState, RepositoryTimeline};
use crate::repository::{Repository, Timeline};
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant_config::TenantConfOpt;
use crate::TimelineImpl;
use crate::{config::PageServerConf, tenant_mgr, timelines};
use utils::{
auth::JwtAuth,
@@ -86,7 +85,7 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
// Helper functions to construct a LocalTimelineInfo struct for a timeline
fn local_timeline_info_from_loaded_timeline(
timeline: &TimelineImpl,
timeline: &LayeredTimeline,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
@@ -161,7 +160,7 @@ fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> Lo
}
fn local_timeline_info_from_repo_timeline(
repo_timeline: &RepositoryTimeline<TimelineImpl>,
repo_timeline: &RepositoryTimeline<LayeredTimeline>,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {

View File

@@ -15,13 +15,24 @@ use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::v14::relfile_utils::*;
use postgres_ffi::v14::waldecoder::*;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::Oid;
use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
// Returns checkpoint LSN from controlfile
pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
// Read control file to extract the LSN
let controlfile_path = path.join("global").join("pg_control");
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
let lsn = controlfile.checkPoint;
Ok(Lsn(lsn))
}
///
/// Import all relation data pages from local disk into the repository.
///
@@ -110,8 +121,8 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
let mut buf: [u8; 8192] = [0u8; 8192];
ensure!(len % pg_constants::BLCKSZ as usize == 0);
let nblocks = len / pg_constants::BLCKSZ as usize;
ensure!(len % BLCKSZ as usize == 0);
let nblocks = len / BLCKSZ as usize;
let rel = RelTag {
spcnode: spcoid,
@@ -120,7 +131,7 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
forknum,
};
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
// Call put_rel_creation for every segment of the relation,
// because there is no guarantee about the order in which we are processing segments.
@@ -144,8 +155,7 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
let relative_blknum =
blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
break;
}
@@ -184,8 +194,8 @@ fn import_slru<T: DatadirTimeline, Reader: Read>(
.to_string_lossy();
let segno = u32::from_str_radix(filename, 16)?;
ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / pg_constants::BLCKSZ as usize;
ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / BLCKSZ as usize;
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);

View File

@@ -1,5 +1,5 @@
use crate::repository::{key_range_size, singleton_range, Key};
use postgres_ffi::pg_constants;
use postgres_ffi::BLCKSZ;
use std::ops::Range;
///
@@ -19,7 +19,7 @@ impl KeySpace {
///
pub fn partition(&self, target_size: u64) -> KeyPartitioning {
// Assume that each value is 8k in size.
let target_nblocks = (target_size / pg_constants::BLCKSZ as u64) as usize;
let target_nblocks = (target_size / BLCKSZ as u64) as usize;
let mut parts = Vec::new();
let mut current_part = Vec::new();

View File

@@ -59,7 +59,9 @@ mod storage_layer;
mod timeline;
use storage_layer::Layer;
use timeline::{LayeredTimeline, LayeredTimelineEntry};
use timeline::LayeredTimelineEntry;
pub use timeline::LayeredTimeline;
// re-export this function so that page_cache.rs can use it.
pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file;

View File

@@ -45,7 +45,7 @@ use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
use crate::DatadirTimeline;
use postgres_ffi::xlog_utils::to_pg_timestamp;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use utils::{
lsn::{AtomicLsn, Lsn, RecordLsn},
seqwait::SeqWait,

View File

@@ -28,7 +28,6 @@ use tracing::info;
use crate::thread_mgr::ThreadKind;
use metrics::{register_int_gauge_vec, IntGaugeVec};
use layered_repository::LayeredRepository;
use pgdatadir_mapping::DatadirTimeline;
/// Current storage format version
@@ -62,9 +61,6 @@ pub enum CheckpointConfig {
Forced,
}
pub type RepositoryImpl = LayeredRepository;
pub type TimelineImpl = <LayeredRepository as repository::Repository>::Timeline;
pub fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint thread. This prevents new connections from
// being accepted.

View File

@@ -83,7 +83,7 @@ pub fn get() -> &'static PageCache {
}
}
pub const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize;
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
const MAX_USAGE_COUNT: u8 = 5;
///

View File

@@ -40,9 +40,10 @@ use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::CheckpointConfig;
use metrics::{register_histogram_vec, HistogramVec};
use postgres_ffi::xlog_utils::to_pg_timestamp;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use postgres_ffi::pg_constants;
use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
@@ -725,10 +726,9 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let total_blocks =
timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let total_blocks = timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64;
let db_size = total_blocks as i64 * BLCKSZ as i64;
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
db_size,

View File

@@ -13,8 +13,10 @@ use crate::repository::*;
use crate::walrecord::ZenithWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
use postgres_ffi::xlog_utils::TimestampTz;
use postgres_ffi::{pg_constants, Oid, TransactionId};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::TimestampTz;
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::ops::Range;
@@ -297,9 +299,9 @@ pub trait DatadirTimeline: Timeline {
let clog_page =
self.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)?;
if clog_page.len() == pg_constants::BLCKSZ as usize + 8 {
if clog_page.len() == BLCKSZ as usize + 8 {
let mut timestamp_bytes = [0u8; 8];
timestamp_bytes.copy_from_slice(&clog_page[pg_constants::BLCKSZ as usize..]);
timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
if timestamp >= search_timestamp {
@@ -382,7 +384,7 @@ pub trait DatadirTimeline: Timeline {
total_size += relsize as usize;
}
}
Ok(total_size * pg_constants::BLCKSZ as usize)
Ok(total_size * BLCKSZ as usize)
}
///
@@ -912,7 +914,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
result?;
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize);
self.pending_nblocks = 0;
}
@@ -940,7 +942,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
writer.finish_write(lsn);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize);
}
Ok(())
@@ -1014,7 +1016,7 @@ struct SlruSegmentDirectory {
segments: HashSet<u32>,
}
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; pg_constants::BLCKSZ as usize]);
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// Layout of the Key address space
//

View File

@@ -2,8 +2,9 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::{pg_constants, Oid};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::relfile_utils::forknumber_to_name;
use postgres_ffi::Oid;
///
/// Relation data file segment id throughout the Postgres cluster.

View File

@@ -412,7 +412,6 @@ pub mod repo_harness {
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{fs, path::PathBuf};
use crate::RepositoryImpl;
use crate::{
config::PageServerConf,
layered_repository::LayeredRepository,
@@ -508,11 +507,11 @@ pub mod repo_harness {
})
}
pub fn load(&self) -> RepositoryImpl {
pub fn load(&self) -> LayeredRepository {
self.try_load().expect("failed to load test repo")
}
pub fn try_load(&self) -> Result<RepositoryImpl> {
pub fn try_load(&self) -> Result<LayeredRepository> {
let walredo_mgr = Arc::new(TestRedoManager);
let repo = LayeredRepository::new(

View File

@@ -3,16 +3,14 @@
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::layered_repository::{load_metadata, LayeredRepository};
use crate::layered_repository::{load_metadata, LayeredRepository, LayeredTimeline};
use crate::repository::Repository;
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr::ThreadKind;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{RepositoryImpl, TimelineImpl};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
@@ -96,13 +94,13 @@ mod tenants_state {
struct Tenant {
state: TenantState,
/// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk.
repo: Arc<RepositoryImpl>,
repo: Arc<LayeredRepository>,
/// Timelines, located locally in the pageserver's datadir.
/// Timelines can entirely be removed entirely by the `detach` operation only.
///
/// Local timelines have more metadata that's loaded into memory,
/// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`].
local_timelines: HashMap<ZTimelineId, Arc<<RepositoryImpl as Repository>::Timeline>>,
local_timelines: HashMap<ZTimelineId, Arc<LayeredTimeline>>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -179,7 +177,7 @@ pub enum LocalTimelineUpdate {
},
Attach {
id: ZTenantTimelineId,
datadir: Arc<<RepositoryImpl as Repository>::Timeline>,
datadir: Arc<LayeredTimeline>,
},
}
@@ -285,10 +283,8 @@ pub fn create_tenant_repository(
conf,
tenant_conf,
tenant_id,
CreateRepo::Real {
wal_redo_manager,
remote_index,
},
wal_redo_manager,
remote_index,
)?;
v.insert(Tenant {
state: TenantState::Idle,
@@ -369,7 +365,7 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
Ok(())
}
pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<RepositoryImpl>> {
pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<LayeredRepository>> {
let m = tenants_state::read_tenants();
let tenant = m
.get(&tenant_id)
@@ -383,7 +379,7 @@ pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<Rep
pub fn get_local_timeline_with_load(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<TimelineImpl>> {
) -> anyhow::Result<Arc<LayeredTimeline>> {
let mut m = tenants_state::write_tenants();
let tenant = m
.get_mut(&tenant_id)
@@ -488,9 +484,9 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
}
fn load_local_timeline(
repo: &RepositoryImpl,
repo: &LayeredRepository,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<TimelineImpl>> {
) -> anyhow::Result<Arc<LayeredTimeline>> {
let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| {
format!("Inmem timeline {timeline_id} not found in tenant's repository")
})?;
@@ -634,7 +630,7 @@ fn load_local_repo(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<RepositoryImpl>> {
) -> anyhow::Result<Arc<LayeredRepository>> {
let mut m = tenants_state::write_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| {
// Set up a WAL redo manager, for applying WAL records.

View File

@@ -3,7 +3,7 @@
//
use anyhow::{bail, ensure, Context, Result};
use postgres_ffi::ControlFileData;
use std::{
fs,
path::Path,
@@ -13,18 +13,21 @@ use std::{
use tracing::*;
use utils::{
crashsafe_dir, logging,
crashsafe_dir,
lsn::Lsn,
zid::{ZTenantId, ZTimelineId},
};
use crate::import_datadir;
use crate::tenant_mgr;
use crate::{
config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt, RepositoryImpl, TimelineImpl,
tenant_config::TenantConfOpt,
};
use crate::{
layered_repository::{LayeredRepository, LayeredTimeline},
walredo::WalRedoManager,
};
use crate::{import_datadir, LOG_FILE_NAME};
use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager};
use crate::{repository::Timeline, CheckpointConfig};
#[derive(Debug, Clone, Copy)]
@@ -33,69 +36,13 @@ pub struct PointInTime {
pub lsn: Lsn,
}
pub fn init_pageserver(
conf: &'static PageServerConf,
create_tenant: Option<ZTenantId>,
initial_timeline_id: Option<ZTimelineId>,
) -> anyhow::Result<()> {
// Initialize logger
// use true as daemonize parameter because otherwise we pollute zenith cli output with a few pages long output of info messages
let _log_file = logging::init(LOG_FILE_NAME, true)?;
crashsafe_dir::create_dir_all(conf.tenants_path())?;
if let Some(tenant_id) = create_tenant {
println!("initializing tenantid {}", tenant_id);
let repo = create_repo(conf, TenantConfOpt::default(), tenant_id, CreateRepo::Dummy)
.context("failed to create repo")?;
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
.context("failed to create initial timeline")?;
println!("initial timeline {} created", new_timeline_id)
} else if initial_timeline_id.is_some() {
println!("Ignoring initial timeline parameter, due to no tenant id to create given");
}
println!("pageserver init succeeded");
Ok(())
}
pub enum CreateRepo {
Real {
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
remote_index: RemoteIndex,
},
Dummy,
}
pub fn create_repo(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: ZTenantId,
create_repo: CreateRepo,
) -> Result<Arc<RepositoryImpl>> {
let (wal_redo_manager, remote_index) = match create_repo {
CreateRepo::Real {
wal_redo_manager,
remote_index,
} => (wal_redo_manager, remote_index),
CreateRepo::Dummy => {
// We don't use the real WAL redo manager, because we don't want to spawn the WAL redo
// process during repository initialization.
//
// FIXME: That caused trouble, because the WAL redo manager spawned a thread that launched
// initdb in the background, and it kept running even after the "zenith init" had exited.
// In tests, we started the page server immediately after that, so that initdb was still
// running in the background, and we failed to run initdb again in the same directory. This
// has been solved for the rapid init+start case now, but the general race condition remains
// if you restart the server quickly. The WAL redo manager doesn't use a separate thread
// anymore, but I think that could still happen.
let wal_redo_manager = Arc::new(crate::walredo::DummyRedoManager {});
(wal_redo_manager as _, RemoteIndex::default())
}
};
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
remote_index: RemoteIndex,
) -> Result<Arc<LayeredRepository>> {
let repo_dir = conf.tenant_path(&tenant_id);
ensure!(
!repo_dir.exists(),
@@ -122,16 +69,6 @@ pub fn create_repo(
)))
}
// Returns checkpoint LSN from controlfile
fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
// Read control file to extract the LSN
let controlfile_path = path.join("global").join("pg_control");
let controlfile = ControlFileData::decode(&fs::read(controlfile_path)?)?;
let lsn = controlfile.checkPoint;
Ok(Lsn(lsn))
}
// Create the cluster temporarily in 'initdbpath' directory inside the repository
// to get bootstrap data for timeline initialization.
//
@@ -181,7 +118,7 @@ fn bootstrap_timeline<R: Repository>(
run_initdb(conf, &initdb_path)?;
let pgdata_path = initdb_path;
let lsn = get_lsn_from_controlfile(&pgdata_path)?.align();
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
// Import the contents of the data directory at the initial checkpoint
// LSN, and any WAL after that.
@@ -223,7 +160,7 @@ pub(crate) fn create_timeline(
new_timeline_id: Option<ZTimelineId>,
ancestor_timeline_id: Option<ZTimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
) -> Result<Option<(ZTimelineId, Arc<TimelineImpl>)>> {
) -> Result<Option<(ZTimelineId, Arc<LayeredTimeline>)>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate);
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;

View File

@@ -22,8 +22,8 @@
//! bespoken Rust code.
use anyhow::Context;
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{page_is_new, page_set_lsn};
use anyhow::Result;
@@ -33,10 +33,12 @@ use tracing::*;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::walrecord::*;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId;
use postgres_ffi::{pg_constants, CheckPoint};
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
@@ -293,7 +295,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize);
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
if blk.hole_length != 0 {
@@ -309,7 +311,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
assert_eq!(image.len(), BLCKSZ as usize);
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
} else {
let rec = ZenithWalRecord::Postgres {
@@ -1033,7 +1035,8 @@ mod tests {
use crate::pgdatadir_mapping::create_test_timeline;
use crate::repository::repo_harness::*;
use crate::repository::Timeline;
use postgres_ffi::pg_constants;
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
use postgres_ffi::RELSEG_SIZE;
/// Arbitrary relation tag, for testing.
const TESTREL_A: RelTag = RelTag {
@@ -1322,7 +1325,7 @@ mod tests {
let mut walingest = init_walingest_test(&*tline)?;
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
for blknum in 0..RELSEG_SIZE + 1 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
@@ -1332,31 +1335,22 @@ mod tests {
assert_current_logical_size(&*tline, Lsn(lsn));
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE + 1
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(lsn))?, RELSEG_SIZE + 1);
// Truncate one block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE)?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(lsn))?, RELSEG_SIZE);
assert_current_logical_size(&*tline, Lsn(lsn));
// Truncate another block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1)?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(lsn))?, RELSEG_SIZE - 1);
assert_current_logical_size(&*tline, Lsn(lsn));
// Truncate to 1500, and then truncate all the way down to 0, one block at a time

View File

@@ -16,6 +16,7 @@ use std::{
time::Duration,
};
use crate::{layered_repository::LayeredTimeline, repository::Timeline};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
@@ -25,12 +26,7 @@ use etcd_broker::{
use tokio::select;
use tracing::*;
use crate::{
exponential_backoff,
repository::{Repository, Timeline},
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use crate::{RepositoryImpl, TimelineImpl};
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use utils::{
lsn::Lsn,
zid::{NodeId, ZTenantTimelineId},
@@ -43,7 +39,7 @@ pub(super) fn spawn_connection_manager_task(
id: ZTenantTimelineId,
broker_loop_prefix: String,
mut client: Client,
local_timeline: Arc<TimelineImpl>,
local_timeline: Arc<LayeredTimeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
@@ -100,6 +96,8 @@ async fn connection_manager_loop_step(
info!("Subscribed for etcd timeline changes, waiting for new etcd data");
loop {
let time_until_next_retry = walreceiver_state.time_until_next_retry();
select! {
broker_connection_result = &mut broker_subscription.watcher_handle => {
cleanup_broker_connection(broker_connection_result, walreceiver_state);
@@ -114,27 +112,23 @@ async fn connection_manager_loop_step(
} => {
let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Started => {
*walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1;
},
TaskEvent::Started => {},
TaskEvent::NewEvent(status) => {
if status.has_received_wal {
// Reset connection attempts here only, we know that safekeeper is healthy
// because it can send us a WAL update.
walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id);
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = status;
},
TaskEvent::End(end_result) => {
match end_result {
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => {
warn!("WAL receiving task failed: {e}");
// If the task failed, set the connection attempts to at least 1, to try other safekeepers.
let _ = *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(1);
}
Err(e) => warn!("WAL receiving task failed: {e}"),
};
walreceiver_state.wal_connection = None;
walreceiver_state.drop_old_connection(false).await;
},
}
},
@@ -158,6 +152,8 @@ async fn connection_manager_loop_step(
}
}
},
_ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {}
}
// Fetch more etcd timeline updates, but limit ourselves since they may arrive quickly.
@@ -238,11 +234,15 @@ async fn subscribe_for_timeline_updates(
}
}
const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
struct WalreceiverState {
id: ZTenantTimelineId,
/// Use pageserver data about the timeline to filter out some of the safekeepers.
local_timeline: Arc<TimelineImpl>,
local_timeline: Arc<LayeredTimeline>,
/// The timeout on the connection to safekeeper for WAL streaming.
wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
@@ -251,7 +251,8 @@ struct WalreceiverState {
max_lsn_wal_lag: NonZeroU64,
/// Current connection to safekeeper for WAL streaming.
wal_connection: Option<WalConnection>,
wal_connection_attempts: HashMap<NodeId, u32>,
/// Info about retries and unsuccessful attempts to connect to safekeepers.
wal_connection_retries: HashMap<NodeId, RetryInfo>,
/// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, EtcdSkTimeline>,
}
@@ -259,6 +260,8 @@ struct WalreceiverState {
/// Current connection data.
#[derive(Debug)]
struct WalConnection {
/// Time when the connection was initiated.
started_at: NaiveDateTime,
/// Current safekeeper pageserver is connected to for WAL streaming.
sk_id: NodeId,
/// Status of the connection.
@@ -278,6 +281,12 @@ struct NewCommittedWAL {
discovered_at: NaiveDateTime,
}
#[derive(Debug)]
struct RetryInfo {
next_retry_at: Option<NaiveDateTime>,
retry_duration_seconds: f64,
}
/// Data about the timeline to connect to, received from etcd.
#[derive(Debug)]
struct EtcdSkTimeline {
@@ -291,7 +300,7 @@ struct EtcdSkTimeline {
impl WalreceiverState {
fn new(
id: ZTenantTimelineId,
local_timeline: Arc<<RepositoryImpl as Repository>::Timeline>,
local_timeline: Arc<LayeredTimeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
@@ -304,31 +313,18 @@ impl WalreceiverState {
max_lsn_wal_lag,
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_attempts: HashMap::new(),
wal_connection_retries: HashMap::new(),
}
}
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) {
if let Some(old_connection) = self.wal_connection.take() {
old_connection.connection_task.shutdown().await
}
self.drop_old_connection(true).await;
let id = self.id;
let connect_timeout = self.wal_connect_timeout;
let connection_attempt = self
.wal_connection_attempts
.get(&new_sk_id)
.copied()
.unwrap_or(0);
let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
async move {
exponential_backoff(
connection_attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
)
.await;
super::walreceiver_connection::handle_walreceiver_connection(
id,
&new_wal_source_connstr,
@@ -344,10 +340,11 @@ impl WalreceiverState {
let now = Utc::now().naive_utc();
self.wal_connection = Some(WalConnection {
started_at: now,
sk_id: new_sk_id,
status: WalConnectionStatus {
is_connected: false,
has_received_wal: false,
has_processed_wal: false,
latest_connection_update: now,
latest_wal_update: now,
streaming_lsn: None,
@@ -358,6 +355,71 @@ impl WalreceiverState {
});
}
/// Drops the current connection (if any) and updates retry timeout for the next
/// connection attempt to the same safekeeper.
async fn drop_old_connection(&mut self, needs_shutdown: bool) {
let wal_connection = match self.wal_connection.take() {
Some(wal_connection) => wal_connection,
None => return,
};
if needs_shutdown {
wal_connection.connection_task.shutdown().await;
}
let retry = self
.wal_connection_retries
.entry(wal_connection.sk_id)
.or_insert(RetryInfo {
next_retry_at: None,
retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
});
let now = Utc::now().naive_utc();
// Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
// and we add backoff to the time when we started the connection attempt. If the connection
// was active for a long time, then next_retry_at will be in the past.
retry.next_retry_at =
wal_connection
.started_at
.checked_add_signed(chrono::Duration::milliseconds(
(retry.retry_duration_seconds * 1000.0) as i64,
));
if let Some(next) = &retry.next_retry_at {
if next > &now {
info!(
"Next connection retry to {:?} is at {}",
wal_connection.sk_id, next
);
}
}
let next_retry_duration =
retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
// Clamp the next retry duration to the maximum allowed.
let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
// Clamp the next retry duration to the minimum allowed.
let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
retry.retry_duration_seconds = next_retry_duration;
}
/// Returns time needed to wait to have a new candidate for WAL streaming.
fn time_until_next_retry(&self) -> Option<Duration> {
let now = Utc::now().naive_utc();
let next_retry_at = self
.wal_connection_retries
.values()
.filter_map(|retry| retry.next_retry_at)
.filter(|next_retry_at| next_retry_at > &now)
.min();
next_retry_at.and_then(|next_retry_at| (next_retry_at - now).to_std().ok())
}
/// Adds another etcd timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: BrokerUpdate<SkTimelineInfo>) {
match self
@@ -551,52 +613,37 @@ impl WalreceiverState {
/// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
///
/// The candidate that is chosen:
/// * has fewest connection attempts from pageserver to safekeeper node (reset every time we receive a WAL message from the node)
/// * has greatest data Lsn among the ones that are left
///
/// NOTE:
/// We evict timeline data received from etcd based on time passed since it was registered, along with its connection attempts values, but
/// otherwise to reset the connection attempts, a successful connection to that node is needed.
/// That won't happen now, before all nodes with less connection attempts are connected to first, which might leave the sk node with more advanced state to be ignored.
/// * has no pending retry cooldown
/// * has greatest commit_lsn among the ones that are left
fn select_connection_candidate(
&self,
node_to_omit: Option<NodeId>,
) -> Option<(NodeId, &SkTimelineInfo, String)> {
let all_candidates = self
.applicable_connection_candidates()
self.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
.collect::<Vec<_>>();
let smallest_attempts_allowed = all_candidates
.iter()
.map(|(sk_id, _, _)| {
self.wal_connection_attempts
.get(sk_id)
.copied()
.unwrap_or(0)
})
.min()?;
all_candidates
.into_iter()
.filter(|(sk_id, _, _)| {
smallest_attempts_allowed
>= self
.wal_connection_attempts
.get(sk_id)
.copied()
.unwrap_or(0)
})
.max_by_key(|(_, info, _)| info.commit_lsn)
}
/// Returns a list of safekeepers that have valid info and ready for connection.
/// Some safekeepers are filtered by the retry cooldown.
fn applicable_connection_candidates(
&self,
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> {
let now = Utc::now().naive_utc();
self.wal_stream_candidates
.iter()
.filter(|(_, info)| info.timeline.commit_lsn.is_some())
.filter(move |(sk_id, _)| {
let next_retry_at = self
.wal_connection_retries
.get(sk_id)
.and_then(|retry_info| {
retry_info.next_retry_at
});
next_retry_at.is_none() || next_retry_at.unwrap() <= now
})
.filter_map(|(sk_id, etcd_info)| {
let info = &etcd_info.timeline;
match wal_stream_connection_string(
@@ -631,7 +678,7 @@ impl WalreceiverState {
});
for node_id in node_ids_to_remove {
self.wal_connection_attempts.remove(&node_id);
self.wal_connection_retries.remove(&node_id);
}
}
}
@@ -688,7 +735,6 @@ fn wal_stream_connection_string(
#[cfg(test)]
mod tests {
use crate::repository::{
repo_harness::{RepoHarness, TIMELINE_ID},
Repository,
@@ -793,7 +839,7 @@ mod tests {
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
has_processed_wal: true,
latest_connection_update: now,
latest_wal_update: now,
commit_lsn: Some(Lsn(current_lsn)),
@@ -802,6 +848,7 @@ mod tests {
state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
@@ -1021,7 +1068,13 @@ mod tests {
},
),
]);
state.wal_connection_attempts = HashMap::from([(NodeId(0), 1), (NodeId(1), 0)]);
state.wal_connection_retries = HashMap::from([(
NodeId(0),
RetryInfo {
next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
},
)]);
let candidate_with_less_errors = state
.next_connection_candidate()
@@ -1029,7 +1082,7 @@ mod tests {
assert_eq!(
candidate_with_less_errors.safekeeper_id,
NodeId(1),
"Should select the node with less connection errors"
"Should select the node with no pending retry cooldown"
);
Ok(())
@@ -1047,7 +1100,7 @@ mod tests {
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
has_processed_wal: true,
latest_connection_update: now,
latest_wal_update: now,
commit_lsn: Some(current_lsn),
@@ -1055,6 +1108,7 @@ mod tests {
};
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
@@ -1134,7 +1188,7 @@ mod tests {
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
has_processed_wal: true,
latest_connection_update: time_over_threshold,
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
@@ -1142,6 +1196,7 @@ mod tests {
};
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: NodeId(1),
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
@@ -1206,7 +1261,7 @@ mod tests {
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
has_processed_wal: true,
latest_connection_update: now,
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
@@ -1214,6 +1269,7 @@ mod tests {
};
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: NodeId(1),
status: connection_status,
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
@@ -1285,7 +1341,7 @@ mod tests {
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_attempts: HashMap::new(),
wal_connection_retries: HashMap::new(),
}
}
}

View File

@@ -27,7 +27,7 @@ use crate::{
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::v14::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
/// Status of the connection.
@@ -35,8 +35,9 @@ use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
pub struct WalConnectionStatus {
/// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
pub is_connected: bool,
/// Defines a healthy connection as one on which we have received at least some WAL bytes.
pub has_received_wal: bool,
/// Defines a healthy connection as one on which pageserver received WAL from safekeeper
/// and is able to process it in walingest without errors.
pub has_processed_wal: bool,
/// Connection establishment time or the timestamp of a latest connection message received.
pub latest_connection_update: NaiveDateTime,
/// Time of the latest WAL message received.
@@ -71,7 +72,7 @@ pub async fn handle_walreceiver_connection(
info!("connected!");
let mut connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: false,
has_processed_wal: false,
latest_connection_update: Utc::now().naive_utc(),
latest_wal_update: Utc::now().naive_utc(),
streaming_lsn: None,
@@ -117,13 +118,6 @@ pub async fn handle_walreceiver_connection(
let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}");
connection_status.latest_connection_update = Utc::now().naive_utc();
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
// NB: this is a flush_lsn, not a commit_lsn.
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let ZTenantTimelineId {
@@ -131,6 +125,14 @@ pub async fn handle_walreceiver_connection(
timeline_id,
} = id;
connection_status.latest_connection_update = Utc::now().naive_utc();
connection_status.latest_wal_update = Utc::now().naive_utc();
connection_status.commit_lsn = Some(end_of_wal);
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
let (repo, timeline) = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("no repository found for tenant {tenant_id}"))?;
@@ -181,6 +183,7 @@ pub async fn handle_walreceiver_connection(
} {
let replication_message = replication_message?;
let now = Utc::now().naive_utc();
let last_rec_lsn_before_msg = last_rec_lsn;
// Update the connection status before processing the message. If the message processing
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
@@ -193,7 +196,6 @@ pub async fn handle_walreceiver_connection(
));
if !xlog_data.data().is_empty() {
connection_status.latest_wal_update = now;
connection_status.has_received_wal = true;
}
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
@@ -265,6 +267,15 @@ pub async fn handle_walreceiver_connection(
_ => None,
};
if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
// We have successfully processed at least one WAL record.
connection_status.has_processed_wal = true;
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
}
let timeline_to_check = Arc::clone(&timeline);
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
.await

View File

@@ -3,9 +3,10 @@
//!
use anyhow::Result;
use bytes::{Buf, Bytes};
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD};
use postgres_ffi::XLogRecord;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD};
use postgres_ffi::v14::XLogRecord;
use postgres_ffi::BLCKSZ;
use postgres_ffi::{BlockNumber, OffsetNumber};
use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
use serde::{Deserialize, Serialize};
@@ -618,7 +619,7 @@ pub fn decode_wal_record(
blk.hole_length = 0;
}
} else {
blk.hole_length = pg_constants::BLCKSZ - blk.bimg_len;
blk.hole_length = BLCKSZ - blk.bimg_len;
}
datatotal += blk.bimg_len as u32;
blocks_total_len += blk.bimg_len as u32;
@@ -628,9 +629,7 @@ pub fn decode_wal_record(
* bimg_len < BLCKSZ if the HAS_HOLE flag is set.
*/
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
&& (blk.hole_offset == 0
|| blk.hole_length == 0
|| blk.bimg_len == pg_constants::BLCKSZ)
&& (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
{
// TODO
/*
@@ -667,7 +666,7 @@ pub fn decode_wal_record(
* flag is set.
*/
if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0)
&& blk.bimg_len == pg_constants::BLCKSZ
&& blk.bimg_len == BLCKSZ
{
// TODO
/*
@@ -685,7 +684,7 @@ pub fn decode_wal_record(
*/
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
&& blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0
&& blk.bimg_len != pg_constants::BLCKSZ
&& blk.bimg_len != BLCKSZ
{
// TODO
/*

View File

@@ -44,11 +44,12 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Key;
use crate::walrecord::ZenithWalRecord;
use metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift;
use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::v14::nonrelfile_utils::{
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
transaction_id_set_status,
};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::BLCKSZ;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
@@ -82,24 +83,6 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>;
}
///
/// A dummy WAL Redo Manager implementation that doesn't allow replaying
/// anything. Currently used during bootstrapping (zenith init), to create
/// a Repository object without launching the real WAL redo process.
///
pub struct DummyRedoManager {}
impl crate::walredo::WalRedoManager for DummyRedoManager {
fn request_redo(
&self,
_key: Key,
_lsn: Lsn,
_base_img: Option<Bytes>,
_records: Vec<(Lsn, ZenithWalRecord)>,
) -> Result<Bytes, WalRedoError> {
Err(WalRedoError::InvalidState)
}
}
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting
@@ -435,10 +418,10 @@ impl PostgresRedoManager {
}
// Append the timestamp
if page.len() == pg_constants::BLCKSZ as usize + 8 {
page.truncate(pg_constants::BLCKSZ as usize);
if page.len() == BLCKSZ as usize + 8 {
page.truncate(BLCKSZ as usize);
}
if page.len() == pg_constants::BLCKSZ as usize {
if page.len() == BLCKSZ as usize {
page.extend_from_slice(&timestamp.to_be_bytes());
} else {
warn!(
@@ -759,7 +742,7 @@ impl PostgresRedoProcess {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()];
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
// Prepare for calling poll()
@@ -772,7 +755,7 @@ impl PostgresRedoProcess {
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
while nresult < pg_constants::BLCKSZ.into() {
while nresult < BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };

View File

@@ -27,6 +27,7 @@ prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "2.1.2"
pytest-order = "^1.0.1"
neon-dev-utils = {path = "./bindings/python/neon-dev-utils"}
[tool.poetry.dev-dependencies]
yapf = "==0.31.0"

View File

@@ -9,7 +9,7 @@ use crate::timeline::{Timeline, TimelineTools};
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use postgres_ffi::xlog_utils::PG_TLI;
use postgres_ffi::PG_TLI;
use regex::Regex;
use std::str::FromStr;
use std::sync::Arc;
@@ -90,7 +90,10 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> {
let cmd = parse_cmd(query_string)?;
info!("got query {:?}", query_string);
info!(
"got query {:?} in timeline {:?}",
query_string, self.ztimelineid
);
let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. })
|| matches!(cmd, SafekeeperPostgresCommand::IdentifySystem));
@@ -106,23 +109,17 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
}
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
ReceiveWalConn::new(pgb)
.run(self)
.context("failed to run ReceiveWalConn")?;
}
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
ReplicationConn::new(pgb)
.run(self, pgb, start_lsn)
.context("failed to run ReplicationConn")?;
}
SafekeeperPostgresCommand::IdentifySystem => {
self.handle_identify_system(pgb)?;
}
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd)?;
}
SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb)
.run(self)
.context("failed to run ReceiveWalConn"),
SafekeeperPostgresCommand::StartReplication { start_lsn } => ReplicationConn::new(pgb)
.run(self, pgb, start_lsn)
.context("failed to run ReplicationConn"),
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb),
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => handle_json_ctrl(self, pgb, cmd),
}
.context(format!("timeline {timelineid}"))?;
Ok(())
}
}
@@ -153,8 +150,15 @@ impl SafekeeperPostgresHandler {
/// Handle IDENTIFY_SYSTEM replication command
///
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
let start_pos = self.timeline.get().get_end_of_wal();
let lsn = start_pos.to_string();
let lsn = if self.is_walproposer_recovery() {
// walproposer should get all local WAL until flush_lsn
self.timeline.get().get_end_of_wal()
} else {
// other clients shouldn't get any uncommitted WAL
self.timeline.get().get_state().0.commit_lsn
}
.to_string();
let sysid = self
.timeline
.get()
@@ -203,4 +207,11 @@ impl SafekeeperPostgresHandler {
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
Ok(())
}
/// Returns true if current connection is a replication connection, originating
/// from a walproposer recovery function. This connection gets a special handling:
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
pub fn is_walproposer_recovery(&self) -> bool {
self.appname == Some("wal_proposer_recovery".to_string())
}
}

View File

@@ -7,8 +7,7 @@
//!
use anyhow::Result;
use bytes::{BufMut, Bytes, BytesMut};
use crc32c::crc32c_append;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tracing::*;
@@ -19,9 +18,8 @@ use crate::safekeeper::{
};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::timeline::TimelineTools;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils;
use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils;
use utils::{
lsn::Lsn,
postgres_backend::PostgresBackend,
@@ -144,7 +142,7 @@ fn append_logical_message(
spg: &mut SafekeeperPostgresHandler,
msg: &AppendLogicalMessage,
) -> Result<InsertedWAL> {
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = spg.timeline.get().get_state().1;
let begin_lsn = msg.begin_lsn;
@@ -182,90 +180,3 @@ fn append_logical_message(
append_response,
})
}
#[repr(C)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct XlLogicalMessage {
db_id: Oid,
transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
prefix_size: uint64,
message_size: uint64,
}
impl XlLogicalMessage {
pub fn encode(&self) -> Bytes {
use utils::bin_ser::LeSer;
self.ser().unwrap().into()
}
}
/// Create new WAL record for non-transactional logical message.
/// Used for creating artificial WAL for tests, as LogicalMessage
/// record is basically no-op.
fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1);
prefix_bytes.put(prefix.as_bytes());
prefix_bytes.put_u8(0);
let message_bytes = message.as_bytes();
let logical_message = XlLogicalMessage {
db_id: 0,
transactional: 0,
prefix_size: prefix_bytes.len() as u64,
message_size: message_bytes.len() as u64,
};
let mainrdata = logical_message.encode();
let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
// only short mainrdata is supported for now
assert!(mainrdata_len <= 255);
let mainrdata_len = mainrdata_len as u8;
let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
data.extend_from_slice(&mainrdata);
data.extend_from_slice(&prefix_bytes);
data.extend_from_slice(message_bytes);
let total_len = xlog_utils::XLOG_SIZE_OF_XLOG_RECORD + data.len();
let mut header = XLogRecord {
xl_tot_len: total_len as u32,
xl_xid: 0,
xl_prev: 0,
xl_info: 0,
xl_rmid: 21,
__bindgen_padding_0: [0u8; 2usize],
xl_crc: 0, // crc will be calculated later
};
let header_bytes = header.encode().expect("failed to encode header");
let crc = crc32c_append(0, &data);
let crc = crc32c_append(crc, &header_bytes[0..xlog_utils::XLOG_RECORD_CRC_OFFS]);
header.xl_crc = crc;
let mut wal: Vec<u8> = Vec::new();
wal.extend_from_slice(&header.encode().expect("failed to encode header"));
wal.extend_from_slice(&data);
// WAL start position must be aligned at 8 bytes,
// this will add padding for the next WAL record.
const PADDING: usize = 8;
let padding_rem = wal.len() % PADDING;
if padding_rem != 0 {
wal.resize(wal.len() + PADDING - padding_rem, 0);
}
wal
}
#[test]
fn test_encode_logical_message() {
let expected = [
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38,
0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
];
let actual = encode_logical_message("prefix", "message");
assert_eq!(expected, actual[..]);
}

View File

@@ -7,7 +7,7 @@ use metrics::{
proto::MetricFamily,
Gauge, IntGaugeVec,
};
use postgres_ffi::xlog_utils::XLogSegNo;
use postgres_ffi::v14::xlog_utils::XLogSegNo;
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use crate::{

View File

@@ -5,9 +5,7 @@ use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::xlog_utils::TimeLineID;
use postgres_ffi::xlog_utils::XLogSegNo;
use postgres_ffi::v14::xlog_utils::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
@@ -19,7 +17,6 @@ use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use utils::{
bin_ser::LeSer,
lsn::Lsn,

View File

@@ -6,7 +6,7 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use crate::wal_storage::WalReader;
use anyhow::{bail, Context, Result};
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
use postgres_ffi::v14::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -170,6 +170,7 @@ impl ReplicationConn {
// spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(spg.timeline.get());
let bg_stream_in = self.stream_in.take().unwrap();
let bg_timeline_id = spg.ztimelineid.unwrap();
let state = ReplicaState::new();
// This replica_id is used below to check if it's time to stop replication.
@@ -188,6 +189,8 @@ impl ReplicationConn {
let _ = thread::Builder::new()
.name("HotStandbyFeedback thread".into())
.spawn(move || {
let _enter =
info_span!("HotStandbyFeedback thread", timeline = %bg_timeline_id).entered();
if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) {
error!("Replication background thread failed: {}", err);
}
@@ -198,13 +201,12 @@ impl ReplicationConn {
.build()?;
runtime.block_on(async move {
let (_, persisted_state) = spg.timeline.get().get_state();
let (inmem_state, persisted_state) = spg.timeline.get().get_state();
// add persisted_state.timeline_start_lsn == Lsn(0) check
if persisted_state.server.wal_seg_size == 0 {
bail!("Cannot start replication before connecting to walproposer");
}
let wal_end = spg.timeline.get().get_end_of_wal();
// Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will
// hang otherwise). That's because walproposer runs the consensus and
@@ -214,8 +216,8 @@ impl ReplicationConn {
// another compute rises which collects majority and starts fixing log
// on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL.
let stop_pos: Option<Lsn> = if spg.appname == Some("wal_proposer_recovery".to_string())
{
let stop_pos: Option<Lsn> = if spg.is_walproposer_recovery() {
let wal_end = spg.timeline.get().get_end_of_wal();
Some(wal_end)
} else {
None
@@ -226,7 +228,7 @@ impl ReplicationConn {
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?;
let mut end_pos = Lsn(0);
let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn);
let mut wal_reader = WalReader::new(
spg.conf.timeline_dir(&spg.timeline.get().zttid),

View File

@@ -4,8 +4,9 @@
use anyhow::{bail, Context, Result};
use etcd_broker::subscription_value::SkTimelineInfo;
use once_cell::sync::Lazy;
use postgres_ffi::xlog_utils::XLogSegNo;
use postgres_ffi::v14::xlog_utils::XLogSegNo;
use serde::Serialize;
use tokio::sync::watch;

View File

@@ -11,7 +11,8 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr};
use postgres_ffi::PG_TLI;
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::fs::File;
use tokio::runtime::Builder;

View File

@@ -13,9 +13,10 @@ use std::pin::Pin;
use tokio::io::AsyncRead;
use once_cell::sync::Lazy;
use postgres_ffi::xlog_utils::{
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI,
use postgres_ffi::v14::xlog_utils::{
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo,
};
use postgres_ffi::PG_TLI;
use std::cmp::min;
use std::fs::{self, remove_file, File, OpenOptions};
@@ -30,9 +31,10 @@ use crate::safekeeper::SafeKeeperState;
use crate::wal_backup::read_object;
use crate::SafeKeeperConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
use postgres_ffi::v14::xlog_utils::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::v14::waldecoder::WalStreamDecoder;
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};

View File

@@ -0,0 +1,5 @@
from neon_dev_utils import sum_as_string
def test_neon_dev_utils():
assert sum_as_string(2, 3) == "5"

View File

@@ -120,7 +120,10 @@ def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBu
@pytest.mark.timeout(1800)
@pytest.mark.skipif(os.environ.get('BUILD_TYPE') == "debug", reason="only run with release build")
# TODO: temporarily disable `test_import_from_pageserver_multisegment` test, enable
# the test back after finding the failure cause.
# @pytest.mark.skipif(os.environ.get('BUILD_TYPE') == "debug", reason="only run with release build")
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2255")
def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
neon_env_builder.enable_local_fs_remote_storage()

View File

@@ -1,7 +1,11 @@
from typing import Optional
from uuid import uuid4, UUID
import pytest
import pathlib
import os
import subprocess
from fixtures.utils import lsn_from_hex
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME,
NeonEnv,
@@ -9,16 +13,43 @@ from fixtures.neon_fixtures import (
NeonPageserverHttpClient,
NeonPageserverApiException,
wait_until,
neon_binpath,
pg_distrib_dir,
)
# test that we cannot override node id
def test_pageserver_init_node_id(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init()
with pytest.raises(
Exception,
match="node id can only be set during pageserver init and cannot be overridden"):
env.pageserver.start(overrides=['--pageserver-config-override=id=10'])
# test that we cannot override node id after init
def test_pageserver_init_node_id(neon_simple_env: NeonEnv):
repo_dir = neon_simple_env.repo_dir
pageserver_config = repo_dir / 'pageserver.toml'
pageserver_bin = pathlib.Path(neon_binpath) / 'pageserver'
run_pageserver = lambda args: subprocess.run([str(pageserver_bin), '-D', str(repo_dir), *args],
check=False,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# remove initial config
pageserver_config.unlink()
bad_init = run_pageserver(['--init', '-c', f'pg_distrib_dir="{pg_distrib_dir}"'])
assert bad_init.returncode == 1, 'pageserver should not be able to init new config without the node id'
assert "missing id" in bad_init.stderr
assert not pageserver_config.exists(), 'config file should not be created after init error'
completed_init = run_pageserver(
['--init', '-c', 'id = 12345', '-c', f'pg_distrib_dir="{pg_distrib_dir}"'])
assert completed_init.returncode == 0, 'pageserver should be able to create a new config with the node id given'
assert pageserver_config.exists(), 'config file should be created successfully'
bad_reinit = run_pageserver(
['--init', '-c', 'id = 12345', '-c', f'pg_distrib_dir="{pg_distrib_dir}"'])
assert bad_reinit.returncode == 1, 'pageserver should not be able to init new config without the node id'
assert "already exists, cannot init it" in bad_reinit.stderr
bad_update = run_pageserver(['--update-config', '-c', 'id = 3'])
assert bad_update.returncode == 1, 'pageserver should not allow updating node id'
assert "has node id already, it cannot be overridden" in bad_update.stderr
def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID):

View File

@@ -44,30 +44,22 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
cannot use NeonPageserver yet because it depends on neon cli
which currently lacks support for multiple pageservers
"""
cmd = [
str(pageserver_bin),
'--init',
'--workdir',
str(new_pageserver_dir),
f"-c listen_pg_addr='localhost:{pg_port}'",
f"-c listen_http_addr='localhost:{http_port}'",
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c id=2",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
]
if broker is not None:
cmd.append(f"-c broker_endpoints=['{broker.client_url()}']", )
subprocess.check_output(cmd, text=True)
# actually run new pageserver
cmd = [
str(pageserver_bin),
'--workdir',
str(new_pageserver_dir),
'--daemonize',
'--update-config',
f"-c listen_pg_addr='localhost:{pg_port}'",
f"-c listen_http_addr='localhost:{http_port}'",
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c id=2",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
]
if broker is not None:
cmd.append(f"-c broker_endpoints=['{broker.client_url()}']", )
log.info("starting new pageserver %s", cmd)
out = subprocess.check_output(cmd, text=True)
log.info("started new pageserver %s", out)