mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-19 19:32:55 +00:00
Compare commits
7 Commits
safe_flush
...
snapfile
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5b79e033bd | ||
|
|
93b2e49939 | ||
|
|
c3833ef0f4 | ||
|
|
acfc5c5d21 | ||
|
|
0a0d12368e | ||
|
|
8d2b517359 | ||
|
|
26bcd72619 |
@@ -237,23 +237,6 @@ jobs:
|
||||
- store_test_results:
|
||||
path: /tmp/test_output
|
||||
|
||||
# Build zenithdb/zenith:latest image and push it to Docker hub
|
||||
docker-image:
|
||||
docker:
|
||||
- image: cimg/base:2021.04
|
||||
steps:
|
||||
- checkout
|
||||
- setup_remote_docker:
|
||||
docker_layer_caching: true
|
||||
- run:
|
||||
name: Init postgres submodule
|
||||
command: git submodule update --init --depth 1
|
||||
- run:
|
||||
name: Build and push Docker image
|
||||
command: |
|
||||
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
docker build -t zenithdb/zenith:latest . && docker push zenithdb/zenith:latest
|
||||
|
||||
workflows:
|
||||
build_and_test:
|
||||
jobs:
|
||||
@@ -282,14 +265,3 @@ workflows:
|
||||
test_selection: batch_others
|
||||
requires:
|
||||
- build-zenith-<< matrix.build_type >>
|
||||
- docker-image:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
# Build image only for commits to main
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- main
|
||||
requires:
|
||||
- pg_regress tests release
|
||||
- other tests release
|
||||
|
||||
78
Cargo.lock
generated
78
Cargo.lock
generated
@@ -703,6 +703,15 @@ version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
|
||||
dependencies = [
|
||||
"unicode-segmentation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.1.19"
|
||||
@@ -1403,6 +1412,30 @@ version = "0.2.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||
dependencies = [
|
||||
"proc-macro-error-attr",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error-attr"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-hack"
|
||||
version = "0.5.19"
|
||||
@@ -1932,6 +1965,21 @@ version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
|
||||
|
||||
[[package]]
|
||||
name = "snapfile"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"aversion",
|
||||
"bookfile",
|
||||
"hex",
|
||||
"rand",
|
||||
"serde",
|
||||
"structopt",
|
||||
"tempfile",
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.4.0"
|
||||
@@ -1964,6 +2012,30 @@ version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
|
||||
|
||||
[[package]]
|
||||
name = "structopt"
|
||||
version = "0.3.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69b041cdcb67226aca307e6e7be44c8806423d83e018bd662360a93dabce4d71"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"lazy_static",
|
||||
"structopt-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "structopt-derive"
|
||||
version = "0.4.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7813934aecf5f51a54775e00068c237de98489463968231a51746bbbc03f9c10"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro-error",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "subtle"
|
||||
version = "2.4.1"
|
||||
@@ -2272,6 +2344,12 @@ dependencies = [
|
||||
"tinyvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-segmentation"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-width"
|
||||
version = "0.1.8"
|
||||
|
||||
@@ -9,6 +9,7 @@ members = [
|
||||
"zenith",
|
||||
"zenith_metrics",
|
||||
"zenith_utils",
|
||||
"snapfile",
|
||||
]
|
||||
|
||||
[profile.release]
|
||||
|
||||
83
Dockerfile
83
Dockerfile
@@ -1,77 +1,94 @@
|
||||
#
|
||||
# Docker image for console integration testing.
|
||||
#
|
||||
# We may also reuse it in CI to unify installation process and as a general binaries building
|
||||
# tool for production servers.
|
||||
#
|
||||
# Dynamic linking is used for librocksdb and libstdc++ bacause librocksdb-sys calls
|
||||
# bindgen with "dynamic" feature flag. This also prevents usage of dockerhub alpine-rust
|
||||
# images which are statically linked and have guards against any dlopen. I would rather
|
||||
# prefer all static binaries so we may change the way librocksdb-sys builds or wait until
|
||||
# we will have our own storage and drop rockdb dependency.
|
||||
#
|
||||
# Cargo-chef is used to separate dependencies building from main binaries building. This
|
||||
# way `docker build` will download and install dependencies only of there are changes to
|
||||
# out Cargo.toml files.
|
||||
#
|
||||
|
||||
|
||||
#
|
||||
# Build Postgres separately --- this layer will be rebuilt only if one of
|
||||
# mentioned paths will get any changes.
|
||||
# build postgres separately -- this layer will be rebuilt only if one of
|
||||
# mentioned paths will get any changes
|
||||
#
|
||||
FROM zenithdb/build:buster AS pg-build
|
||||
WORKDIR /zenith
|
||||
FROM alpine:3.13 as pg-build
|
||||
RUN apk add --update clang llvm compiler-rt compiler-rt-static lld musl-dev binutils \
|
||||
make bison flex readline-dev zlib-dev perl linux-headers libseccomp-dev
|
||||
WORKDIR zenith
|
||||
COPY ./vendor/postgres vendor/postgres
|
||||
COPY ./Makefile Makefile
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) -s postgres
|
||||
# Build using clang and lld
|
||||
RUN CC='clang' LD='lld' CFLAGS='-fuse-ld=lld --rtlib=compiler-rt' make postgres -j4
|
||||
|
||||
#
|
||||
# Calculate cargo dependencies.
|
||||
# This will always run, but only generate recipe.json with list of dependencies without
|
||||
# installing them.
|
||||
#
|
||||
FROM zenithdb/build:buster AS cargo-deps-inspect
|
||||
WORKDIR /zenith
|
||||
FROM alpine:20210212 as cargo-deps-inspect
|
||||
RUN apk add --update rust cargo
|
||||
RUN cargo install cargo-chef
|
||||
WORKDIR zenith
|
||||
COPY . .
|
||||
RUN cargo chef prepare --recipe-path /zenith/recipe.json
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
#
|
||||
# Build cargo dependencies.
|
||||
# This temp cantainner should be rebuilt only if recipe.json was changed.
|
||||
# This temp cantainner would be build only if recipe.json was changed.
|
||||
#
|
||||
FROM zenithdb/build:buster AS deps-build
|
||||
WORKDIR /zenith
|
||||
FROM alpine:20210212 as deps-build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
# rust-rocksdb can be built against system-wide rocksdb -- that saves about
|
||||
# 10 minutes during build. Rocksdb apk package is in testing now, but use it
|
||||
# anyway. In case of any troubles we can download and build rocksdb here manually
|
||||
# (to cache it as a docker layer).
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=cargo-deps-inspect /usr/local/cargo/bin/cargo-chef /usr/local/cargo/bin/
|
||||
COPY --from=cargo-deps-inspect /root/.cargo/bin/cargo-chef /root/.cargo/bin/
|
||||
COPY --from=cargo-deps-inspect /zenith/recipe.json recipe.json
|
||||
RUN ROCKSDB_LIB_DIR=/usr/lib/ cargo chef cook --release --recipe-path recipe.json
|
||||
|
||||
#
|
||||
# Build zenith binaries
|
||||
#
|
||||
FROM zenithdb/build:buster AS build
|
||||
WORKDIR /zenith
|
||||
FROM alpine:20210212 as build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
COPY . .
|
||||
# Copy cached dependencies
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=deps-build /zenith/target target
|
||||
COPY --from=deps-build /usr/local/cargo/ /usr/local/cargo/
|
||||
COPY --from=deps-build /root/.cargo /root/.cargo
|
||||
RUN cargo build --release
|
||||
|
||||
#
|
||||
# Copy binaries to resulting image.
|
||||
# build-base hare to provide libstdc++ (it will also bring gcc, but leave it this way until we figure
|
||||
# out how to statically link rocksdb or avoid it at all).
|
||||
#
|
||||
FROM debian:buster-slim
|
||||
WORKDIR /data
|
||||
|
||||
RUN apt-get update && apt-get -yq install librocksdb-dev libseccomp-dev openssl && \
|
||||
mkdir zenith_install
|
||||
|
||||
FROM alpine:3.13
|
||||
RUN apk add --update openssl build-base libseccomp-dev
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb
|
||||
COPY --from=build /zenith/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/wal_acceptor /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/proxy /usr/local/bin
|
||||
COPY --from=pg-build /zenith/tmp_install postgres_install
|
||||
COPY --from=pg-build /zenith/tmp_install /usr/local
|
||||
COPY docker-entrypoint.sh /docker-entrypoint.sh
|
||||
|
||||
# Remove build artifacts (~ 500 MB)
|
||||
RUN rm -rf postgres_install/build && \
|
||||
# 'Install' Postgres binaries locally
|
||||
cp -r postgres_install/* /usr/local/ && \
|
||||
# Prepare an archive of Postgres binaries (should be around 11 MB)
|
||||
# and keep it inside container for an ease of deploy pipeline.
|
||||
cd postgres_install && tar -czf /data/postgres_install.tar.gz . && cd .. && \
|
||||
rm -rf postgres_install
|
||||
|
||||
RUN useradd -m -d /data zenith
|
||||
|
||||
RUN addgroup zenith && adduser -h /data -D -G zenith zenith
|
||||
VOLUME ["/data"]
|
||||
WORKDIR /data
|
||||
USER zenith
|
||||
EXPOSE 6400
|
||||
ENTRYPOINT ["/docker-entrypoint.sh"]
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
#
|
||||
# Docker image for console integration testing.
|
||||
#
|
||||
# We may also reuse it in CI to unify installation process and as a general binaries building
|
||||
# tool for production servers.
|
||||
#
|
||||
# Dynamic linking is used for librocksdb and libstdc++ bacause librocksdb-sys calls
|
||||
# bindgen with "dynamic" feature flag. This also prevents usage of dockerhub alpine-rust
|
||||
# images which are statically linked and have guards against any dlopen. I would rather
|
||||
# prefer all static binaries so we may change the way librocksdb-sys builds or wait until
|
||||
# we will have our own storage and drop rockdb dependency.
|
||||
#
|
||||
# Cargo-chef is used to separate dependencies building from main binaries building. This
|
||||
# way `docker build` will download and install dependencies only of there are changes to
|
||||
# out Cargo.toml files.
|
||||
#
|
||||
|
||||
|
||||
#
|
||||
# build postgres separately -- this layer will be rebuilt only if one of
|
||||
# mentioned paths will get any changes
|
||||
#
|
||||
FROM alpine:3.13 as pg-build
|
||||
RUN apk add --update clang llvm compiler-rt compiler-rt-static lld musl-dev binutils \
|
||||
make bison flex readline-dev zlib-dev perl linux-headers libseccomp-dev
|
||||
WORKDIR zenith
|
||||
COPY ./vendor/postgres vendor/postgres
|
||||
COPY ./Makefile Makefile
|
||||
# Build using clang and lld
|
||||
RUN CC='clang' LD='lld' CFLAGS='-fuse-ld=lld --rtlib=compiler-rt' make postgres -j4
|
||||
|
||||
#
|
||||
# Calculate cargo dependencies.
|
||||
# This will always run, but only generate recipe.json with list of dependencies without
|
||||
# installing them.
|
||||
#
|
||||
FROM alpine:20210212 as cargo-deps-inspect
|
||||
RUN apk add --update rust cargo
|
||||
RUN cargo install cargo-chef
|
||||
WORKDIR zenith
|
||||
COPY . .
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
#
|
||||
# Build cargo dependencies.
|
||||
# This temp cantainner would be build only if recipe.json was changed.
|
||||
#
|
||||
FROM alpine:20210212 as deps-build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
# rust-rocksdb can be built against system-wide rocksdb -- that saves about
|
||||
# 10 minutes during build. Rocksdb apk package is in testing now, but use it
|
||||
# anyway. In case of any troubles we can download and build rocksdb here manually
|
||||
# (to cache it as a docker layer).
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=cargo-deps-inspect /root/.cargo/bin/cargo-chef /root/.cargo/bin/
|
||||
COPY --from=cargo-deps-inspect /zenith/recipe.json recipe.json
|
||||
RUN ROCKSDB_LIB_DIR=/usr/lib/ cargo chef cook --release --recipe-path recipe.json
|
||||
|
||||
#
|
||||
# Build zenith binaries
|
||||
#
|
||||
FROM alpine:20210212 as build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
COPY . .
|
||||
# Copy cached dependencies
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=deps-build /zenith/target target
|
||||
COPY --from=deps-build /root/.cargo /root/.cargo
|
||||
RUN cargo build --release
|
||||
|
||||
#
|
||||
# Copy binaries to resulting image.
|
||||
# build-base hare to provide libstdc++ (it will also bring gcc, but leave it this way until we figure
|
||||
# out how to statically link rocksdb or avoid it at all).
|
||||
#
|
||||
FROM alpine:3.13
|
||||
RUN apk add --update openssl build-base libseccomp-dev
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb
|
||||
COPY --from=build /zenith/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/wal_acceptor /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/proxy /usr/local/bin
|
||||
COPY --from=pg-build /zenith/tmp_install /usr/local
|
||||
COPY docker-entrypoint.sh /docker-entrypoint.sh
|
||||
|
||||
RUN addgroup zenith && adduser -h /data -D -G zenith zenith
|
||||
VOLUME ["/data"]
|
||||
WORKDIR /data
|
||||
USER zenith
|
||||
EXPOSE 6400
|
||||
ENTRYPOINT ["/docker-entrypoint.sh"]
|
||||
CMD ["pageserver"]
|
||||
@@ -1,15 +0,0 @@
|
||||
#
|
||||
# Image with all the required dependencies to build https://github.com/zenithdb/zenith
|
||||
# and Postgres from https://github.com/zenithdb/postgres
|
||||
# Also includes some rust development and build tools.
|
||||
#
|
||||
FROM rust:slim-buster
|
||||
WORKDIR /zenith
|
||||
|
||||
# Install postgres and zenith build dependencies
|
||||
# clang is for rocksdb
|
||||
RUN apt-get update && apt-get -yq install automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
|
||||
libseccomp-dev pkg-config libssl-dev librocksdb-dev clang
|
||||
|
||||
# Install rust tools
|
||||
RUN rustup component add clippy && cargo install cargo-chef cargo-audit
|
||||
12
README.md
12
README.md
@@ -12,12 +12,15 @@ apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libsec
|
||||
libssl-dev clang
|
||||
```
|
||||
|
||||
[Rust] 1.52 or later is also required.
|
||||
[Rust] 1.48 or later is also required.
|
||||
|
||||
To run the `psql` client, install the `postgresql-client` package or modify `PATH` and `LD_LIBRARY_PATH` to include `tmp_install/bin` and `tmp_install/lib`, respectively.
|
||||
|
||||
To run the integration tests (not required to use the code), install
|
||||
Python (3.6 or higher), and install python3 packages with `pipenv` using `pipenv install` in the project directory.
|
||||
Python (3.6 or higher), and install python3 packages with `pip` (called `pip3` on some systems):
|
||||
```
|
||||
pip install pytest psycopg2
|
||||
```
|
||||
|
||||
2. Build zenith and patched postgres
|
||||
```sh
|
||||
@@ -103,9 +106,10 @@ pytest
|
||||
|
||||
## Documentation
|
||||
|
||||
Now we use README files to cover design ideas and overall architecture for each module and `rustdoc` style documentation comments. See also [/docs/](/docs/) a top-level overview of all available markdown documentation.
|
||||
Now we use README files to cover design ideas and overall architecture for each module.
|
||||
And rustdoc style documentation comments.
|
||||
|
||||
To view your `rustdoc` documentation in a browser, try running `cargo doc --no-deps --open`
|
||||
To view your documentation in a browser, try running `cargo doc --no-deps --open`
|
||||
|
||||
## Source tree layout
|
||||
|
||||
|
||||
@@ -320,7 +320,7 @@ impl PostgresNode {
|
||||
|
||||
// Never clean up old WAL. TODO: We should use a replication
|
||||
// slot or something proper, to prevent the compute node
|
||||
// from removing WAL that hasn't been streamed to the safekeeper or
|
||||
// from removing WAL that hasn't been streamed to the safekeepr or
|
||||
// page server yet. (gh issue #349)
|
||||
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
if [ "$1" = 'pageserver' ]; then
|
||||
if [ ! -d "/data/tenants" ]; then
|
||||
if [ ! -d "/data/timelines" ]; then
|
||||
echo "Initializing pageserver data directory"
|
||||
pageserver --init -D /data --postgres-distrib /usr/local
|
||||
fi
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
# Zenith documentation
|
||||
|
||||
## Table of contents
|
||||
|
||||
- [authentication.md](authentication.md) — pageserver JWT authentication.
|
||||
- [docker.md](docker.md) — Docker images and building pipeline.
|
||||
- [multitenancy.md](multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
|
||||
- [pageserver/README](/pageserver/README) — pageserver overview.
|
||||
- [postgres_ffi/README](/postgres_ffi/README) — Postgres FFI overview.
|
||||
- [test_runner/README.md](/test_runner/README.md) — tests infrastructure overview.
|
||||
- [walkeeper/README](/walkeeper/README.md) — WAL service overview.
|
||||
@@ -1,38 +0,0 @@
|
||||
# Docker images of Zenith
|
||||
|
||||
## Images
|
||||
|
||||
Currently we build two main images:
|
||||
|
||||
- [zenithdb/zenith](https://hub.docker.com/repository/docker/zenithdb/zenith) — image with pre-built `pageserver`, `wal_acceptor` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
|
||||
- [zenithdb/compute-node](https://hub.docker.com/repository/docker/zenithdb/compute-node) — compute node image with pre-built Postgres binaries from [zenithdb/postgres](https://github.com/zenithdb/postgres).
|
||||
|
||||
And two intermediate images used either to reduce build time or to deliver some additional binary tools from other repos:
|
||||
|
||||
- [zenithdb/build](https://hub.docker.com/repository/docker/zenithdb/build) — image with all the dependencies required to build Zenith and compute node images. This image is based on `rust:slim-buster`, so it also has a proper `rust` environment. Built from [/Dockerfile.build](/Dockerfile.build).
|
||||
- [zenithdb/compute-tools](https://hub.docker.com/repository/docker/zenithdb/compute-tools) — compute node configuration management tools.
|
||||
|
||||
## Building pipeline
|
||||
|
||||
1. Image `zenithdb/compute-tools` is re-built automatically.
|
||||
|
||||
2. Image `zenithdb/build` is built manually. If you want to introduce any new compile time dependencies to Zenith or compute node you have to update this image as well, build it and push to Docker Hub.
|
||||
|
||||
Build:
|
||||
```sh
|
||||
docker build -t zenithdb/build:buster -f Dockerfile.build .
|
||||
```
|
||||
|
||||
Login:
|
||||
```sh
|
||||
docker login
|
||||
```
|
||||
|
||||
Push to Docker Hub:
|
||||
```sh
|
||||
docker push zenithdb/build:buster
|
||||
```
|
||||
|
||||
3. Image `zenithdb/compute-node` is built independently in the [zenithdb/postgres](https://github.com/zenithdb/postgres) repo.
|
||||
|
||||
4. Image `zenithdb/zenith` is built in this repo after a successful `release` tests run and pushed to Docker Hub automatically.
|
||||
@@ -41,6 +41,7 @@ use zenith_utils::seqwait::SeqWait;
|
||||
|
||||
mod inmemory_layer;
|
||||
mod layer_map;
|
||||
mod page_history;
|
||||
mod snapshot_layer;
|
||||
mod storage_layer;
|
||||
|
||||
@@ -842,34 +843,6 @@ impl Timeline for LayeredTimeline {
|
||||
fn get_prev_record_lsn(&self) -> Lsn {
|
||||
self.prev_record_lsn.load()
|
||||
}
|
||||
|
||||
///
|
||||
/// Wait until WAL has been received up to the given LSN.
|
||||
///
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
}
|
||||
|
||||
impl LayeredTimeline {
|
||||
@@ -1083,6 +1056,34 @@ impl LayeredTimeline {
|
||||
Ok(layer_rc)
|
||||
}
|
||||
|
||||
///
|
||||
/// Wait until WAL has been received up to the given LSN.
|
||||
///
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
///
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
|
||||
@@ -2,15 +2,15 @@
|
||||
//! An in-memory layer stores recently received page versions in memory. The page versions
|
||||
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
|
||||
//!
|
||||
use crate::layered_repository::page_history::PageHistory;
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::SnapshotLayer;
|
||||
use crate::layered_repository::{LayeredTimeline, SnapshotLayer};
|
||||
use crate::repository::WALRecord;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use std::collections::BTreeMap;
|
||||
@@ -43,9 +43,9 @@ pub struct InMemoryLayerInner {
|
||||
|
||||
///
|
||||
/// All versions of all pages in the layer are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
/// Indexed by block number.
|
||||
///
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
pages: BTreeMap<u32, PageHistory>,
|
||||
|
||||
///
|
||||
/// `segsizes` tracks the size of the segment at different points in time.
|
||||
@@ -90,29 +90,32 @@ impl Layer for InMemoryLayer {
|
||||
) -> Result<Option<Lsn>> {
|
||||
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||
|
||||
assert!(self.seg.blknum_in_seg(blknum));
|
||||
|
||||
{
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let minkey = (blknum, Lsn(0));
|
||||
let maxkey = (blknum, lsn);
|
||||
let mut iter = inner
|
||||
.page_versions
|
||||
.range((Included(&minkey), Included(&maxkey)));
|
||||
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
|
||||
let pages = &inner.pages;
|
||||
|
||||
// FIXME: this assumes the latest page version is always the right answer.
|
||||
// How should this work if the requested lsn is in the past? in the future?
|
||||
|
||||
let latest_version = pages
|
||||
.get(&blknum)
|
||||
.and_then(PageHistory::latest)
|
||||
.ok_or_else(|| anyhow!("page not found"))?;
|
||||
|
||||
let (entry_lsn, entry) = latest_version;
|
||||
if true {
|
||||
if let Some(img) = &entry.page_image {
|
||||
reconstruct_data.page_img = Some(img.clone());
|
||||
need_base_image_lsn = None;
|
||||
break;
|
||||
} else if let Some(rec) = &entry.record {
|
||||
reconstruct_data.records.push(rec.clone());
|
||||
if rec.will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_base_image_lsn = None;
|
||||
break;
|
||||
} else {
|
||||
need_base_image_lsn = Some(*entry_lsn);
|
||||
need_base_image_lsn = Some(entry_lsn);
|
||||
}
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
@@ -120,7 +123,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'page_versions'
|
||||
// release lock on self.pages
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
@@ -184,7 +187,7 @@ impl InMemoryLayer {
|
||||
start_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
pages: BTreeMap::new(),
|
||||
segsizes: BTreeMap::new(),
|
||||
}),
|
||||
})
|
||||
@@ -230,15 +233,11 @@ impl InMemoryLayer {
|
||||
);
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
||||
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!(
|
||||
"Page version of rel {} blk {} at {} already exists",
|
||||
self.seg.rel, blknum, lsn
|
||||
);
|
||||
}
|
||||
let page_history = inner
|
||||
.pages
|
||||
.entry(blknum)
|
||||
.or_insert_with(PageHistory::default);
|
||||
page_history.push(lsn, pv);
|
||||
|
||||
// Also update the relation size, if this extended the relation.
|
||||
if self.seg.rel.is_blocky() {
|
||||
@@ -311,7 +310,7 @@ impl InMemoryLayer {
|
||||
timelineid,
|
||||
lsn
|
||||
);
|
||||
let mut page_versions = BTreeMap::new();
|
||||
let mut pages = BTreeMap::new();
|
||||
let mut segsizes = BTreeMap::new();
|
||||
|
||||
let seg = src.get_seg_tag();
|
||||
@@ -333,7 +332,8 @@ impl InMemoryLayer {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
};
|
||||
page_versions.insert((blknum, lsn), pv);
|
||||
let page_history = PageHistory::from_image(lsn, pv);
|
||||
pages.insert(blknum, page_history);
|
||||
}
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
@@ -344,8 +344,8 @@ impl InMemoryLayer {
|
||||
start_lsn: lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: page_versions,
|
||||
segsizes: segsizes,
|
||||
pages,
|
||||
segsizes,
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -388,10 +388,11 @@ impl InMemoryLayer {
|
||||
};
|
||||
|
||||
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
|
||||
let mut before_page_versions;
|
||||
let mut before_pages = BTreeMap::new();
|
||||
let mut before_segsizes;
|
||||
let mut after_page_versions;
|
||||
let mut after_segsizes;
|
||||
let mut after_pages = BTreeMap::new();
|
||||
|
||||
if !dropped {
|
||||
before_segsizes = BTreeMap::new();
|
||||
after_segsizes = BTreeMap::new();
|
||||
@@ -403,20 +404,16 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
before_page_versions = BTreeMap::new();
|
||||
after_page_versions = BTreeMap::new();
|
||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
||||
if *lsn > end_lsn {
|
||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
} else {
|
||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
}
|
||||
for (blknum, page_history) in inner.pages.iter() {
|
||||
let (old, new) = page_history.clone().split_at(end_lsn);
|
||||
before_pages.insert(*blknum, old);
|
||||
after_pages.insert(*blknum, new);
|
||||
}
|
||||
} else {
|
||||
before_page_versions = inner.page_versions.clone();
|
||||
before_pages = inner.pages.clone();
|
||||
before_segsizes = inner.segsizes.clone();
|
||||
after_segsizes = BTreeMap::new();
|
||||
after_page_versions = BTreeMap::new();
|
||||
after_pages = BTreeMap::new();
|
||||
}
|
||||
|
||||
// we can release the lock now.
|
||||
@@ -431,13 +428,13 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
end_lsn,
|
||||
dropped,
|
||||
before_page_versions,
|
||||
before_pages,
|
||||
before_segsizes,
|
||||
)?;
|
||||
|
||||
// If there were any "new" page versions, initialize a new in-memory layer to hold
|
||||
// them
|
||||
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
let new_open = if !after_segsizes.is_empty() || !after_pages.is_empty() {
|
||||
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
||||
|
||||
let new_open = Self::copy_snapshot(
|
||||
@@ -449,7 +446,7 @@ impl InMemoryLayer {
|
||||
end_lsn,
|
||||
)?;
|
||||
let mut new_inner = new_open.inner.lock().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
new_inner.pages.append(&mut after_pages);
|
||||
new_inner.segsizes.append(&mut after_segsizes);
|
||||
drop(new_inner);
|
||||
|
||||
@@ -476,14 +473,16 @@ impl InMemoryLayer {
|
||||
for (k, v) in inner.segsizes.iter() {
|
||||
result += &format!("{}: {}\n", k, v);
|
||||
}
|
||||
for (k, v) in inner.page_versions.iter() {
|
||||
result += &format!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
k.0,
|
||||
k.1,
|
||||
v.page_image.is_some(),
|
||||
v.record.is_some()
|
||||
);
|
||||
for (page_num, page_history) in inner.pages.iter() {
|
||||
for (lsn, image) in page_history.iter() {
|
||||
result += &format!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
page_num,
|
||||
lsn,
|
||||
image.page_image.is_some(),
|
||||
image.record.is_some()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
|
||||
94
pageserver/src/layered_repository/page_history.rs
Normal file
94
pageserver/src/layered_repository/page_history.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use super::storage_layer::PageVersion;
|
||||
use std::collections::VecDeque;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
/// A data structure that holds one or more versions of a particular page number.
|
||||
//
|
||||
#[derive(Default, Clone)]
|
||||
pub struct PageHistory {
|
||||
/// Pages stored in order, from oldest to newest.
|
||||
pages: VecDeque<(Lsn, PageVersion)>,
|
||||
}
|
||||
|
||||
impl PageHistory {
|
||||
/// Create a new PageHistory containing a single image.
|
||||
pub fn from_image(lsn: Lsn, image: PageVersion) -> Self {
|
||||
let mut pages = VecDeque::new();
|
||||
pages.push_back((lsn, image));
|
||||
PageHistory { pages }
|
||||
}
|
||||
|
||||
/// Push a newer page image.
|
||||
pub fn push(&mut self, lsn: Lsn, page: PageVersion) {
|
||||
if let Some((back_lsn, _)) = self.pages.back() {
|
||||
debug_assert_ne!(
|
||||
back_lsn, &lsn,
|
||||
"push page at lsn {:?} but one already exists",
|
||||
lsn
|
||||
);
|
||||
debug_assert!(back_lsn < &lsn, "pushed page is older than latest lsn");
|
||||
}
|
||||
self.pages.push_back((lsn, page));
|
||||
}
|
||||
|
||||
pub fn latest(&self) -> Option<(Lsn, &PageVersion)> {
|
||||
self.pages.back().map(|(lsn, page)| (*lsn, page))
|
||||
}
|
||||
|
||||
/// Split a page history at a particular LSN.
|
||||
///
|
||||
/// This consumes this PageHistory and returns two new ones.
|
||||
/// Any changes exactly matching the split LSN will be in the
|
||||
/// "old" history.
|
||||
//
|
||||
// FIXME: Is this necessary? There is some debate whether "splitting"
|
||||
// layers is the best design.
|
||||
//
|
||||
pub fn split_at(self, split_lsn: Lsn) -> (PageHistory, PageHistory) {
|
||||
let mut old = PageHistory::default();
|
||||
let mut new = PageHistory::default();
|
||||
for (lsn, page) in self.pages {
|
||||
if lsn > split_lsn {
|
||||
new.push(lsn, page)
|
||||
} else {
|
||||
old.push(lsn, page);
|
||||
}
|
||||
}
|
||||
(old, new)
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &(Lsn, PageVersion)> {
|
||||
self.pages.iter()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn page_history() {
|
||||
fn make_page(b: u8) -> PageVersion {
|
||||
let image = vec![b; 8192].into();
|
||||
PageVersion {
|
||||
page_image: Some(image),
|
||||
record: None,
|
||||
}
|
||||
}
|
||||
|
||||
let mut ph = PageHistory::default();
|
||||
ph.push(10.into(), make_page(1));
|
||||
ph.push(20.into(), make_page(2));
|
||||
ph.push(30.into(), make_page(3));
|
||||
|
||||
let (latest_lsn, latest_image) = ph.latest().unwrap();
|
||||
assert_eq!(latest_lsn, 30.into());
|
||||
assert!(matches!(latest_image, PageVersion { page_image: Some(im), .. } if im[0] == 3));
|
||||
|
||||
let mut it = ph.iter();
|
||||
assert_eq!(it.next().unwrap().0, 10.into());
|
||||
assert_eq!(it.next().unwrap().0, 20.into());
|
||||
assert_eq!(it.next().unwrap().0, 30.into());
|
||||
assert!(it.next().is_none());
|
||||
}
|
||||
}
|
||||
@@ -37,6 +37,7 @@
|
||||
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
|
||||
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
|
||||
//!
|
||||
use crate::layered_repository::page_history::PageHistory;
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag,
|
||||
};
|
||||
@@ -236,7 +237,7 @@ pub struct SnapshotLayerInner {
|
||||
|
||||
/// All versions of all pages in the file are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
pages: BTreeMap<u32, PageHistory>,
|
||||
|
||||
/// `relsizes` tracks the size of the relation at different points in time.
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
@@ -270,6 +271,7 @@ impl Layer for SnapshotLayer {
|
||||
lsn: Lsn,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<Option<Lsn>> {
|
||||
/*
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||
{
|
||||
@@ -303,6 +305,9 @@ impl Layer for SnapshotLayer {
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
@@ -380,7 +385,7 @@ impl SnapshotLayer {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
pages: BTreeMap<u32, PageHistory>,
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
) -> Result<SnapshotLayer> {
|
||||
let snapfile = SnapshotLayer {
|
||||
@@ -393,10 +398,12 @@ impl SnapshotLayer {
|
||||
dropped,
|
||||
inner: Mutex::new(SnapshotLayerInner {
|
||||
loaded: true,
|
||||
page_versions: page_versions,
|
||||
relsizes: relsizes,
|
||||
pages,
|
||||
relsizes,
|
||||
}),
|
||||
};
|
||||
|
||||
/*
|
||||
let inner = snapfile.inner.lock().unwrap();
|
||||
|
||||
// Write the in-memory btreemaps into a file
|
||||
@@ -426,12 +433,16 @@ impl SnapshotLayer {
|
||||
drop(inner);
|
||||
|
||||
Ok(snapfile)
|
||||
*/
|
||||
|
||||
todo!()
|
||||
}
|
||||
|
||||
///
|
||||
/// Load the contents of the file into memory
|
||||
///
|
||||
fn load(&self) -> Result<MutexGuard<SnapshotLayerInner>> {
|
||||
/*
|
||||
// quick exit if already loaded
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
@@ -469,6 +480,9 @@ impl SnapshotLayer {
|
||||
};
|
||||
|
||||
Ok(inner)
|
||||
*/
|
||||
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Create SnapshotLayers representing all files on disk
|
||||
@@ -479,6 +493,7 @@ impl SnapshotLayer {
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<Vec<Arc<SnapshotLayer>>> {
|
||||
/*
|
||||
let path = conf.timeline_path(&timelineid, &tenantid);
|
||||
|
||||
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||
@@ -506,6 +521,8 @@ impl SnapshotLayer {
|
||||
}
|
||||
}
|
||||
return Ok(snapfiles);
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn delete(&self) -> Result<()> {
|
||||
@@ -519,11 +536,14 @@ impl SnapshotLayer {
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
pub fn unload(&self) -> Result<()> {
|
||||
/*
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_versions = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
|
||||
@@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline {
|
||||
assert!(old <= lsn);
|
||||
|
||||
// Use old value of last_record_lsn as prev_record_lsn
|
||||
self.prev_record_lsn.fetch_max(old.align());
|
||||
self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7));
|
||||
|
||||
// Also advance last_valid_lsn
|
||||
let old = self.last_valid_lsn.advance(lsn);
|
||||
@@ -712,41 +712,6 @@ impl Timeline for ObjectTimeline {
|
||||
let iter = self.obj_store.objects(self.timelineid, lsn)?;
|
||||
Ok(Box::new(ObjectHistory { lsn, iter }))
|
||||
}
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, req_lsn: Lsn) -> Result<Lsn> {
|
||||
let mut lsn = req_lsn;
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
trace!(
|
||||
"Start waiting for LSN {}, valid LSN is {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load()
|
||||
);
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load(),
|
||||
)
|
||||
})?;
|
||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectTimeline {
|
||||
@@ -855,6 +820,40 @@ impl ObjectTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
trace!(
|
||||
"Start waiting for LSN {}, valid LSN is {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load()
|
||||
);
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load(),
|
||||
)
|
||||
})?;
|
||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
///
|
||||
/// Iterate through object versions with given key, in reverse LSN order.
|
||||
///
|
||||
|
||||
@@ -357,13 +357,8 @@ impl PageServerHandler {
|
||||
|
||||
/* Send a tarball of the latest snapshot on the timeline */
|
||||
|
||||
let req_lsn = match lsn {
|
||||
Some(lsn) => {
|
||||
timeline.wait_lsn(lsn)?;
|
||||
lsn
|
||||
}
|
||||
None => timeline.get_last_record_lsn(),
|
||||
};
|
||||
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
|
||||
|
||||
{
|
||||
let mut writer = CopyDataSink { pgb };
|
||||
let mut basebackup = basebackup::Basebackup::new(
|
||||
@@ -474,7 +469,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
let (_, params_raw) = query_string.split_at("basebackup ".len());
|
||||
let params = params_raw.split(" ").collect::<Vec<_>>();
|
||||
ensure!(
|
||||
params.len() >= 2,
|
||||
params.len() == 2,
|
||||
"invalid param number for basebackup command"
|
||||
);
|
||||
|
||||
@@ -484,7 +479,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
self.check_permission(Some(tenantid))?;
|
||||
|
||||
// TODO are there any tests with lsn option?
|
||||
let lsn = if params.len() == 3 && params[2].len() != 0 {
|
||||
let lsn = if params.len() == 3 {
|
||||
Some(Lsn::from_str(params[2])?)
|
||||
} else {
|
||||
None
|
||||
@@ -580,10 +575,6 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
timeline.advance_last_valid_lsn(last_lsn);
|
||||
break;
|
||||
}
|
||||
FeMessage::CopyFailed => {
|
||||
info!("Copy failed");
|
||||
break;
|
||||
}
|
||||
FeMessage::Sync => {}
|
||||
_ => bail!("unexpected message {:?}", msg),
|
||||
}
|
||||
|
||||
@@ -203,11 +203,6 @@ pub trait Timeline: Send + Sync {
|
||||
/// Relation size is increased implicitly and decreased with Truncate updates.
|
||||
// TODO ordering guarantee?
|
||||
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
|
||||
}
|
||||
|
||||
pub trait History: Iterator<Item = Result<Modification>> {
|
||||
|
||||
@@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
|
||||
/// Scan PostgreSQL WAL files in given directory
|
||||
/// and load all records >= 'startpoint' into the repository.
|
||||
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
|
||||
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||
@@ -425,11 +425,9 @@ pub fn save_decoded_record(
|
||||
let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
||||
save_xact_record(timeline, lsn, &parsed_xact, decoded)?;
|
||||
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
|
||||
trace!(
|
||||
info!(
|
||||
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
|
||||
decoded.xl_xid,
|
||||
parsed_xact.xid,
|
||||
lsn
|
||||
decoded.xl_xid, parsed_xact.xid, lsn
|
||||
);
|
||||
timeline.put_unlink(
|
||||
RelishTag::TwoPhase {
|
||||
|
||||
@@ -25,13 +25,13 @@ pub type MultiXactStatus = u32;
|
||||
pub struct WalStreamDecoder {
|
||||
lsn: Lsn,
|
||||
|
||||
startlsn: Lsn, // LSN where this record starts
|
||||
contlen: u32,
|
||||
padlen: u32,
|
||||
|
||||
inputbuf: BytesMut,
|
||||
recordbuf: BytesMut,
|
||||
|
||||
crc_check: bool,
|
||||
recordbuf: BytesMut,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
@@ -46,24 +46,19 @@ pub struct WalDecodeError {
|
||||
// FIXME: This isn't a proper rust stream
|
||||
//
|
||||
impl WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn) -> WalStreamDecoder {
|
||||
WalStreamDecoder {
|
||||
lsn,
|
||||
|
||||
startlsn: Lsn(0),
|
||||
contlen: 0,
|
||||
padlen: 0,
|
||||
|
||||
inputbuf: BytesMut::new(),
|
||||
recordbuf: BytesMut::new(),
|
||||
|
||||
crc_check,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn available(&self) -> Lsn {
|
||||
self.lsn + self.inputbuf.remaining() as u64
|
||||
}
|
||||
|
||||
pub fn feed_bytes(&mut self, buf: &[u8]) {
|
||||
self.inputbuf.extend_from_slice(buf);
|
||||
}
|
||||
@@ -97,9 +92,7 @@ impl WalStreamDecoder {
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
||||
if !self.crc_check && self.contlen != hdr.std.xlp_rem_len {
|
||||
self.contlen = hdr.std.xlp_rem_len; // skip continuation record
|
||||
}
|
||||
continue;
|
||||
} else if self.lsn.block_offset() == 0 {
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
||||
return Ok(None);
|
||||
@@ -109,19 +102,14 @@ impl WalStreamDecoder {
|
||||
|
||||
if hdr.xlp_pageaddr != self.lsn.0 {
|
||||
return Err(WalDecodeError {
|
||||
msg: format!(
|
||||
"invalid xlog page header: xlp_pageaddr={} vs. lsn={}",
|
||||
hdr.xlp_pageaddr, self.lsn
|
||||
),
|
||||
msg: "invalid xlog page header".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||
if !self.crc_check && self.contlen != hdr.xlp_rem_len {
|
||||
self.contlen = hdr.xlp_rem_len; // skip continuation record
|
||||
}
|
||||
continue;
|
||||
} else if self.padlen > 0 {
|
||||
if self.inputbuf.remaining() < self.padlen as usize {
|
||||
return Ok(None);
|
||||
@@ -139,6 +127,7 @@ impl WalStreamDecoder {
|
||||
}
|
||||
|
||||
// read xl_tot_len FIXME: assumes little-endian
|
||||
self.startlsn = self.lsn;
|
||||
let xl_tot_len = self.inputbuf.get_u32_le();
|
||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
return Err(WalDecodeError {
|
||||
@@ -153,6 +142,7 @@ impl WalStreamDecoder {
|
||||
self.recordbuf.put_u32_le(xl_tot_len);
|
||||
|
||||
self.contlen = xl_tot_len - 4;
|
||||
continue;
|
||||
} else {
|
||||
// we're continuing a record, possibly from previous page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
@@ -174,10 +164,17 @@ impl WalStreamDecoder {
|
||||
let recordbuf = recordbuf.freeze();
|
||||
let mut buf = recordbuf.clone();
|
||||
|
||||
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
||||
|
||||
// XLOG_SWITCH records are special. If we see one, we need to skip
|
||||
// to the next WAL segment.
|
||||
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
||||
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
return Err(WalDecodeError {
|
||||
msg: "WAL record crc mismatch".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
if xlogrec.is_xlog_switch_record() {
|
||||
trace!("saw xlog switch record at {}", self.lsn);
|
||||
self.padlen =
|
||||
@@ -187,29 +184,10 @@ impl WalStreamDecoder {
|
||||
self.padlen = self.lsn.calc_padding(8u32) as u32;
|
||||
}
|
||||
|
||||
// Check record CRC
|
||||
if self.crc_check {
|
||||
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
|
||||
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
|
||||
return Err(WalDecodeError {
|
||||
msg: format!(
|
||||
"WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}",
|
||||
n,
|
||||
buf.len(),
|
||||
self.lsn,
|
||||
xlogrec
|
||||
),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let result = (self.lsn.align(), recordbuf);
|
||||
let result = (self.lsn, recordbuf);
|
||||
return Ok(Some(result));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// check record boundaries
|
||||
|
||||
@@ -22,6 +22,8 @@ use postgres_types::PgLsn;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Mutex;
|
||||
use std::thread;
|
||||
@@ -176,7 +178,7 @@ fn walreceiver_main(
|
||||
let copy_stream = rclient.copy_both_simple(&query)?;
|
||||
let mut physical_stream = ReplicationIter::new(copy_stream);
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
|
||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
@@ -192,51 +194,45 @@ fn walreceiver_main(
|
||||
let endlsn = startlsn + data.len() as u64;
|
||||
let prev_last_rec_lsn = last_rec_lsn;
|
||||
|
||||
write_wal_file(
|
||||
conf,
|
||||
startlsn,
|
||||
&timelineid,
|
||||
pg_constants::WAL_SEGMENT_SIZE,
|
||||
data,
|
||||
tenantid,
|
||||
)?;
|
||||
|
||||
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
||||
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
loop {
|
||||
match waldecoder.poll_decode() {
|
||||
Ok(Some((lsn, recdata))) => {
|
||||
// Save old checkpoint value to compare with it after decoding WAL record
|
||||
let old_checkpoint_bytes = checkpoint.encode();
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
restore_local_repo::save_decoded_record(
|
||||
&mut checkpoint,
|
||||
&*timeline,
|
||||
&decoded,
|
||||
recdata,
|
||||
lsn,
|
||||
)?;
|
||||
last_rec_lsn = lsn;
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// Save old checkpoint value to compare with it after decoding WAL record
|
||||
let old_checkpoint_bytes = checkpoint.encode();
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
restore_local_repo::save_decoded_record(
|
||||
&mut checkpoint,
|
||||
&*timeline,
|
||||
&decoded,
|
||||
recdata,
|
||||
lsn,
|
||||
)?;
|
||||
last_rec_lsn = lsn;
|
||||
|
||||
let new_checkpoint_bytes = checkpoint.encode();
|
||||
// Check if checkpoint data was updated by save_decoded_record
|
||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||
timeline.put_page_image(
|
||||
RelishTag::Checkpoint,
|
||||
0,
|
||||
lsn,
|
||||
new_checkpoint_bytes,
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
trace!(
|
||||
"End of replication stream {}..{} at {}",
|
||||
startlsn,
|
||||
endlsn,
|
||||
last_rec_lsn
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Decode error {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
let new_checkpoint_bytes = checkpoint.encode();
|
||||
// Check if checkpoint data was updated by save_decoded_record
|
||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||
timeline.put_page_image(
|
||||
RelishTag::Checkpoint,
|
||||
0,
|
||||
lsn,
|
||||
new_checkpoint_bytes,
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Update the last_valid LSN value in the page cache one more time. We updated
|
||||
// it in the loop above, between each WAL record, but we might have received
|
||||
// a partial record after the last completed record. Our page cache's value
|
||||
@@ -411,3 +407,98 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
|
||||
Err(IdentifyError.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn write_wal_file(
|
||||
conf: &PageServerConf,
|
||||
startpos: Lsn,
|
||||
timelineid: &ZTimelineId,
|
||||
wal_seg_size: usize,
|
||||
buf: &[u8],
|
||||
tenantid: &ZTenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut bytes_left: usize = buf.len();
|
||||
let mut bytes_written: usize = 0;
|
||||
let mut partial;
|
||||
let mut start_pos = startpos;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
|
||||
|
||||
while bytes_left != 0 {
|
||||
let bytes_to_write;
|
||||
|
||||
/*
|
||||
* If crossing a WAL boundary, only write up until we reach wal
|
||||
* segment size.
|
||||
*/
|
||||
if xlogoff + bytes_left > wal_seg_size {
|
||||
bytes_to_write = wal_seg_size - xlogoff;
|
||||
} else {
|
||||
bytes_to_write = bytes_left;
|
||||
}
|
||||
|
||||
/* Open file */
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
let wal_file_name = XLogFileName(
|
||||
1, // FIXME: always use Postgres timeline 1
|
||||
segno,
|
||||
wal_seg_size,
|
||||
);
|
||||
let wal_file_path = wal_dir.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
|
||||
|
||||
{
|
||||
let mut wal_file: File;
|
||||
/* Try to open already completed segment */
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
wal_file = file;
|
||||
partial = false;
|
||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
|
||||
/* Try to open existed partial file */
|
||||
wal_file = file;
|
||||
partial = true;
|
||||
} else {
|
||||
/* Create and fill new partial file */
|
||||
partial = true;
|
||||
match OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
|
||||
file.write_all(&ZERO_BLOCK)?;
|
||||
}
|
||||
wal_file = file;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
|
||||
|
||||
// FIXME: Flush the file
|
||||
//wal_file.sync_all()?;
|
||||
}
|
||||
/* Write was successful, advance our position */
|
||||
bytes_written += bytes_to_write;
|
||||
bytes_left -= bytes_to_write;
|
||||
start_pos += bytes_to_write as u64;
|
||||
xlogoff += bytes_to_write;
|
||||
|
||||
/* Did we reach the end of a WAL segment? */
|
||||
if start_pos.segment_offset(wal_seg_size) == 0 {
|
||||
xlogoff = 0;
|
||||
if partial {
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -108,23 +108,17 @@ fn find_end_of_wal_segment(
|
||||
segno: XLogSegNo,
|
||||
tli: TimeLineID,
|
||||
wal_seg_size: usize,
|
||||
is_partial: bool,
|
||||
rec_offs: &mut usize,
|
||||
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
|
||||
crc: &mut u32,
|
||||
check_contrec: bool,
|
||||
) -> u32 {
|
||||
let mut offs: usize = 0;
|
||||
let mut contlen: usize = 0;
|
||||
let mut wal_crc: u32 = 0;
|
||||
let mut crc: u32 = 0;
|
||||
let mut rec_offs: usize = 0;
|
||||
let mut buf = [0u8; XLOG_BLCKSZ];
|
||||
let file_name = XLogFileName(tli, segno, wal_seg_size);
|
||||
let mut last_valid_rec_pos: usize = 0;
|
||||
let file_path = data_dir.join(if is_partial {
|
||||
file_name.clone() + ".partial"
|
||||
} else {
|
||||
file_name
|
||||
});
|
||||
let mut file = File::open(&file_path).unwrap();
|
||||
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
|
||||
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
|
||||
|
||||
while offs < wal_seg_size {
|
||||
if offs % XLOG_BLCKSZ == 0 {
|
||||
@@ -139,33 +133,13 @@ fn find_end_of_wal_segment(
|
||||
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
|
||||
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
|
||||
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
||||
info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic);
|
||||
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
|
||||
break;
|
||||
}
|
||||
if offs == 0 {
|
||||
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
|
||||
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
|
||||
if check_contrec {
|
||||
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
|
||||
contlen = xlp_rem_len as usize;
|
||||
if *rec_offs + contlen < xl_tot_len
|
||||
|| (*rec_offs + contlen != xl_tot_len
|
||||
&& contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD)
|
||||
{
|
||||
info!(
|
||||
"Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}",
|
||||
*rec_offs, contlen, xl_tot_len
|
||||
);
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
offs += ((xlp_rem_len + 7) & !7) as usize;
|
||||
}
|
||||
} else if *rec_offs != 0 {
|
||||
// There is incompleted page at previous segment but no cont record:
|
||||
// it means that current segment is not valid and we have to return back.
|
||||
info!("CONTRECORD flag is missed in page header");
|
||||
return 0;
|
||||
offs += ((xlp_rem_len + 7) & !7) as usize;
|
||||
}
|
||||
} else {
|
||||
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
|
||||
@@ -176,8 +150,9 @@ fn find_end_of_wal_segment(
|
||||
if xl_tot_len == 0 {
|
||||
break;
|
||||
}
|
||||
last_valid_rec_pos = offs;
|
||||
offs += 4;
|
||||
*rec_offs = 4;
|
||||
rec_offs = 4;
|
||||
contlen = xl_tot_len - 4;
|
||||
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
|
||||
} else {
|
||||
@@ -187,33 +162,34 @@ fn find_end_of_wal_segment(
|
||||
|
||||
// read the rest of the record, or as much as fits on this page.
|
||||
let n = min(contlen, pageleft);
|
||||
let mut hdr_len: usize = 0;
|
||||
if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
// copy header
|
||||
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
|
||||
rec_hdr[*rec_offs..*rec_offs + hdr_len]
|
||||
.copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
|
||||
if rec_offs < XLOG_RECORD_CRC_OFFS {
|
||||
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
|
||||
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
|
||||
}
|
||||
*crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]);
|
||||
*rec_offs += n;
|
||||
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
|
||||
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
|
||||
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
|
||||
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
|
||||
crc = !crc;
|
||||
} else {
|
||||
crc ^= 0xFFFFFFFFu32;
|
||||
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
|
||||
crc = !crc;
|
||||
}
|
||||
rec_offs += n;
|
||||
offs += n;
|
||||
contlen -= n;
|
||||
|
||||
if contlen == 0 {
|
||||
*crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
|
||||
crc = !crc;
|
||||
crc = crc32c_append(crc, &rec_hdr);
|
||||
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
|
||||
let wal_crc = LittleEndian::read_u32(
|
||||
&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4],
|
||||
);
|
||||
if *crc == wal_crc {
|
||||
if crc == wal_crc {
|
||||
last_valid_rec_pos = offs;
|
||||
// Reset rec_offs and crc for start of new record
|
||||
*rec_offs = 0;
|
||||
*crc = 0;
|
||||
} else {
|
||||
info!(
|
||||
"CRC mismatch {} vs {} at offset {} lsn {}",
|
||||
*crc, wal_crc, offs, last_valid_rec_pos
|
||||
"CRC mismatch {} vs {} at {}",
|
||||
crc, wal_crc, last_valid_rec_pos
|
||||
);
|
||||
break;
|
||||
}
|
||||
@@ -264,142 +240,20 @@ pub fn find_end_of_wal(
|
||||
}
|
||||
if high_segno > 0 {
|
||||
let mut high_offs = 0;
|
||||
if precise {
|
||||
let mut crc: u32 = 0;
|
||||
let mut rec_offs: usize = 0;
|
||||
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
|
||||
let wal_dir = data_dir.join("pg_wal");
|
||||
|
||||
/*
|
||||
* To be able to calculate CRC of records crossing segment boundary,
|
||||
* we need to parse previous segments.
|
||||
* So first traverse segments in backward direction to locate record start
|
||||
* and then traverse forward, accumulating CRC.
|
||||
*/
|
||||
let mut prev_segno = high_segno - 1;
|
||||
let mut prev_offs: u32 = 0;
|
||||
while prev_segno > 1 {
|
||||
// TOFO: first segment constains dummy checkpoint record at the beginning
|
||||
prev_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
prev_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
false,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
false,
|
||||
);
|
||||
if prev_offs != 0 {
|
||||
break;
|
||||
}
|
||||
prev_segno -= 1;
|
||||
}
|
||||
if prev_offs != 0 {
|
||||
// found start of WAL record
|
||||
let first_segno = prev_segno;
|
||||
let first_offs = prev_offs;
|
||||
while prev_segno + 1 < high_segno {
|
||||
// now traverse record in forward direction, accumulating CRC
|
||||
prev_segno += 1;
|
||||
prev_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
prev_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
false,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
true,
|
||||
);
|
||||
if prev_offs == 0 {
|
||||
info!("Segment {} is corrupted", prev_segno,);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if prev_offs != 0 {
|
||||
high_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
high_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
high_ispartial,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
true,
|
||||
);
|
||||
}
|
||||
if high_offs == 0 {
|
||||
// If last segment contais no valid records, then return back
|
||||
info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment",
|
||||
high_segno, first_segno);
|
||||
// Remove last segments containing corrupted WAL record
|
||||
for segno in first_segno + 1..high_segno {
|
||||
let file_name = XLogFileName(high_tli, segno, wal_seg_size);
|
||||
let file_path = wal_dir.join(file_name);
|
||||
if let Err(e) = fs::remove_file(&file_path) {
|
||||
info!("Failed to remove file {:?}: {}", &file_path, e);
|
||||
}
|
||||
}
|
||||
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
|
||||
let file_path = if high_ispartial {
|
||||
wal_dir.join(file_name.clone() + ".partial")
|
||||
} else {
|
||||
wal_dir.join(file_name.clone())
|
||||
};
|
||||
if let Err(e) = fs::remove_file(&file_path) {
|
||||
info!("Failed to remove file {:?}: {}", &file_path, e);
|
||||
}
|
||||
high_ispartial = false; // previous segment should not be partial
|
||||
high_segno = first_segno;
|
||||
high_offs = first_offs;
|
||||
}
|
||||
} else {
|
||||
// failed to locate previous segment
|
||||
assert!(prev_segno <= 1);
|
||||
high_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
high_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
high_ispartial,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
// If last segment is not marked as partial, it means that next segment
|
||||
// was not written. Let's make this segment partial once again.
|
||||
if !high_ispartial {
|
||||
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
|
||||
if let Err(e) = fs::rename(
|
||||
wal_dir.join(file_name.clone()),
|
||||
wal_dir.join(file_name.clone() + ".partial"),
|
||||
) {
|
||||
info!(
|
||||
"Failed to rename {} to {}.partial: {}",
|
||||
&file_name, &file_name, e
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
* Move the starting pointer to the start of the next segment, if the
|
||||
* highest one we saw was completed.
|
||||
*/
|
||||
if !high_ispartial {
|
||||
high_segno += 1;
|
||||
}
|
||||
/*
|
||||
* Move the starting pointer to the start of the next segment, if the
|
||||
* highest one we saw was completed.
|
||||
*/
|
||||
if !high_ispartial {
|
||||
high_segno += 1;
|
||||
} else if precise {
|
||||
/* otherwise locate last record in last partial segment */
|
||||
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size);
|
||||
}
|
||||
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
|
||||
return (high_ptr, high_tli);
|
||||
}
|
||||
(0, 1) // First timeline is 1
|
||||
(0, 0)
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
@@ -615,7 +469,7 @@ mod tests {
|
||||
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true);
|
||||
let wal_end = Lsn(wal_end);
|
||||
println!("wal_end={}, tli={}", wal_end, tli);
|
||||
assert_eq!(wal_end, "0/1699D10".parse::<Lsn>().unwrap());
|
||||
assert_eq!(wal_end, "0/2000000".parse::<Lsn>().unwrap());
|
||||
|
||||
// 4. Get the actual end of WAL by pg_waldump
|
||||
let waldump_path = top_path.join("tmp_install/bin/pg_waldump");
|
||||
|
||||
21
snapfile/Cargo.toml
Normal file
21
snapfile/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "snapfile"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
|
||||
[[bin]]
|
||||
name = "snaptool"
|
||||
path = "snaptool/main.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
aversion = "0.2"
|
||||
bookfile = "0.3"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
rand = "0.8.3"
|
||||
structopt = "0.3"
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
hex = "0.4.3"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.2"
|
||||
64
snapfile/snaptool/main.rs
Normal file
64
snapfile/snaptool/main.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
use anyhow::{Context, Result};
|
||||
use snapfile::{squash, SnapFile};
|
||||
use std::env::current_dir;
|
||||
use std::path::PathBuf;
|
||||
use structopt::StructOpt;
|
||||
|
||||
#[derive(StructOpt)]
|
||||
#[structopt(about = "A tool for manipulating snapshot files")]
|
||||
enum Params {
|
||||
Squash(Squash),
|
||||
Describe(Describe),
|
||||
}
|
||||
|
||||
#[derive(StructOpt)]
|
||||
struct Squash {
|
||||
older: PathBuf,
|
||||
newer: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(StructOpt)]
|
||||
struct Describe {
|
||||
file: PathBuf,
|
||||
}
|
||||
|
||||
fn print_errors(error: anyhow::Error) {
|
||||
let formatted: Vec<_> = error.chain().map(ToString::to_string).collect();
|
||||
eprintln!("{}", formatted.join(": "));
|
||||
}
|
||||
fn main() {
|
||||
let res = snaptool_main();
|
||||
if let Err(e) = res {
|
||||
print_errors(e);
|
||||
}
|
||||
}
|
||||
|
||||
fn snaptool_main() -> Result<()> {
|
||||
let params = Params::from_args();
|
||||
match ¶ms {
|
||||
Params::Squash(squash_params) => {
|
||||
let out_dir = current_dir()?;
|
||||
squash(&squash_params.older, &squash_params.newer, &out_dir).with_context(|| {
|
||||
format!(
|
||||
"squash {} {}",
|
||||
squash_params.older.to_string_lossy(),
|
||||
squash_params.newer.to_string_lossy()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
Params::Describe(describe_params) => {
|
||||
describe(describe_params)
|
||||
.with_context(|| format!("describe {}", describe_params.file.to_string_lossy()))?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn describe(params: &Describe) -> Result<()> {
|
||||
let mut snap = SnapFile::new(¶ms.file)?;
|
||||
let meta = snap.read_meta()?;
|
||||
|
||||
println!("{:?}: {:#?}", params.file, meta);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
339
snapfile/src/lib.rs
Normal file
339
snapfile/src/lib.rs
Normal file
@@ -0,0 +1,339 @@
|
||||
//! A file format for storage a snapshot of pages.
|
||||
|
||||
#![warn(missing_docs)]
|
||||
#![forbid(unsafe_code)]
|
||||
#![warn(clippy::cast_possible_truncation)]
|
||||
|
||||
mod page;
|
||||
mod squash;
|
||||
mod versioned;
|
||||
|
||||
#[doc(inline)]
|
||||
pub use page::Page;
|
||||
|
||||
#[doc(inline)]
|
||||
pub use squash::squash;
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use aversion::group::{DataSink, DataSourceExt};
|
||||
use aversion::util::cbor::CborData;
|
||||
use bookfile::{Book, BookWriter, ChapterWriter};
|
||||
use std::ffi::OsString;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::ops::AddAssign;
|
||||
use std::path::{Path, PathBuf};
|
||||
pub use versioned::{PageIndex, PageLocation, Predecessor, SnapFileMeta};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
impl SnapFileMeta {
|
||||
pub fn new(previous: Option<SnapFileMeta>, timeline: [u8; 16], lsn: Lsn) -> Self {
|
||||
// Store the metadata of the predecessor snapshot, if there is one.
|
||||
let predecessor = previous.map(|prev| Predecessor {
|
||||
timeline: prev.timeline,
|
||||
lsn: prev.lsn,
|
||||
});
|
||||
|
||||
SnapFileMeta {
|
||||
timeline,
|
||||
predecessor,
|
||||
lsn: lsn.into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_filename(&self) -> OsString {
|
||||
let timeline_string = hex::encode(self.timeline);
|
||||
let pred_lsn = match &self.predecessor {
|
||||
None => 0,
|
||||
Some(pred) => pred.lsn,
|
||||
};
|
||||
format!("{}_{:x}_{:x}.zdb", timeline_string, pred_lsn, self.lsn).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl PageIndex {
|
||||
/// Retrieve the page offset from the index.
|
||||
///
|
||||
/// If the page is not in the index, returns `None`.
|
||||
fn get_page_location(&self, page_num: u64) -> Option<PageLocation> {
|
||||
self.map.get(&page_num).copied()
|
||||
}
|
||||
|
||||
fn page_count(&self) -> usize {
|
||||
self.map.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl PageLocation {
|
||||
fn to_offset(&self) -> u64 {
|
||||
// Counts in units of one page.
|
||||
self.0 * 8192
|
||||
}
|
||||
}
|
||||
|
||||
impl AddAssign<u64> for PageLocation {
|
||||
fn add_assign(&mut self, rhs: u64) {
|
||||
self.0 += rhs;
|
||||
}
|
||||
}
|
||||
|
||||
/// A read-only snapshot file.
|
||||
pub struct SnapFile {
|
||||
book: Book<File>,
|
||||
page_index: PageIndex,
|
||||
}
|
||||
|
||||
impl SnapFile {
|
||||
/// Open a new `SnapFile` for reading.
|
||||
///
|
||||
/// This call will validate some of the file's format and read the file's
|
||||
/// metadata; it may return an error if the file format is invalid.
|
||||
pub fn new(path: &Path) -> Result<Self> {
|
||||
let file =
|
||||
File::open(path).with_context(|| format!("snapfile {}", path.to_string_lossy()))?;
|
||||
let book = Book::new(file)?;
|
||||
if book.magic() != versioned::SNAPFILE_MAGIC {
|
||||
bail!("bad magic number");
|
||||
}
|
||||
|
||||
// Read the page index into memory.
|
||||
let chapter_reader = book
|
||||
.chapter_reader(versioned::CHAPTER_PAGE_INDEX)
|
||||
.context("snapfile missing index chapter")?;
|
||||
let mut source = CborData::new(chapter_reader);
|
||||
let page_index: PageIndex = source.expect_message()?;
|
||||
Ok(SnapFile { book, page_index })
|
||||
}
|
||||
|
||||
/// Read the snapshot metadata.
|
||||
pub fn read_meta(&mut self) -> Result<SnapFileMeta> {
|
||||
let chapter_reader = self
|
||||
.book
|
||||
.chapter_reader(versioned::CHAPTER_SNAP_META)
|
||||
.context("snapfile missing meta")?;
|
||||
let mut source = CborData::new(chapter_reader);
|
||||
let meta: SnapFileMeta = source.expect_message()?;
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
/// Return the number of pages stored in this snapshot.
|
||||
pub fn page_count(&self) -> usize {
|
||||
self.page_index.page_count()
|
||||
}
|
||||
|
||||
/// Check if a page exists in this snapshot's index.
|
||||
///
|
||||
/// Returns `true` if the given page is stored in this snapshot file,
|
||||
/// `false` if not.
|
||||
pub fn has_page(&self, page_num: u64) -> bool {
|
||||
self.page_index.get_page_location(page_num).is_some()
|
||||
}
|
||||
|
||||
/// Read a page.
|
||||
///
|
||||
/// If this returns Ok(None), that means that this file does not store
|
||||
/// the requested page.
|
||||
/// This should only fail (returning `Err`) if an IO error occurs.
|
||||
pub fn read_page(&self, page_num: u64) -> Result<Option<Page>> {
|
||||
match self.page_index.get_page_location(page_num) {
|
||||
None => Ok(None),
|
||||
Some(page_offset) => Ok(Some(self._read_page(page_offset)?)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Read page data from the file.
|
||||
///
|
||||
/// This does the work for read_page and PageIter.
|
||||
fn _read_page(&self, page_location: PageLocation) -> Result<Page> {
|
||||
// Compute the true byte offset in the file.
|
||||
let page_offset = page_location.to_offset();
|
||||
let chapter_reader = self
|
||||
.book
|
||||
.chapter_reader(versioned::CHAPTER_PAGES)
|
||||
.context("snapfile missing pages chapter")?;
|
||||
|
||||
let mut page_data = Page::default();
|
||||
let bytes_read = chapter_reader.read_at(page_data.as_mut(), page_offset)?;
|
||||
if bytes_read != 8192 {
|
||||
bail!("read truncated page");
|
||||
}
|
||||
Ok(page_data)
|
||||
}
|
||||
|
||||
/// Iterate over pages.
|
||||
///
|
||||
/// This will return an iterator over (usize, )
|
||||
pub fn all_pages(&self) -> PageIter {
|
||||
let inner = (&self.page_index.map).into_iter();
|
||||
PageIter {
|
||||
snapfile: self,
|
||||
inner,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator over all pages in the snapshot file.
|
||||
pub struct PageIter<'a> {
|
||||
snapfile: &'a SnapFile,
|
||||
inner: std::collections::btree_map::Iter<'a, u64, PageLocation>,
|
||||
}
|
||||
|
||||
impl Iterator for PageIter<'_> {
|
||||
type Item = Result<(u64, Page)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let (page_num, page_offset) = self.inner.next()?;
|
||||
let result = self
|
||||
.snapfile
|
||||
._read_page(*page_offset)
|
||||
.map(|page_data| (*page_num, page_data));
|
||||
Some(result)
|
||||
}
|
||||
}
|
||||
|
||||
/// `SnapWriter` creates a new snapshot file.
|
||||
///
|
||||
/// A SnapWriter is created, has pages written into it, and is then closed.
|
||||
pub struct SnapWriter {
|
||||
writer: ChapterWriter<File>,
|
||||
page_index: PageIndex,
|
||||
meta: SnapFileMeta,
|
||||
current_offset: PageLocation,
|
||||
}
|
||||
|
||||
impl SnapWriter {
|
||||
/// Create a new `SnapWriter`.
|
||||
///
|
||||
pub fn new(dir: &Path, meta: SnapFileMeta) -> Result<Self> {
|
||||
let mut path = PathBuf::from(dir);
|
||||
path.push(meta.to_filename());
|
||||
let file = File::create(path)?;
|
||||
let book = BookWriter::new(file, versioned::SNAPFILE_MAGIC)?;
|
||||
|
||||
// Write a chapter for the snapshot metadata.
|
||||
let writer = book.new_chapter(versioned::CHAPTER_SNAP_META);
|
||||
let mut sink = CborData::new(writer);
|
||||
sink.write_message(&meta)?;
|
||||
let book = sink.into_inner().close()?;
|
||||
|
||||
// Open a new chapter for raw page data.
|
||||
let writer = book.new_chapter(versioned::CHAPTER_PAGES);
|
||||
Ok(SnapWriter {
|
||||
writer,
|
||||
page_index: PageIndex::default(),
|
||||
meta,
|
||||
current_offset: PageLocation::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Write a page into the snap file.
|
||||
pub fn write_page<P>(&mut self, page_num: u64, page_data: P) -> Result<()>
|
||||
where
|
||||
P: Into<Page>,
|
||||
{
|
||||
let page_data: Page = page_data.into();
|
||||
self.writer.write_all(page_data.as_ref())?;
|
||||
let prev = self.page_index.map.insert(page_num, self.current_offset);
|
||||
if prev.is_some() {
|
||||
panic!("duplicate index for page {}", page_num);
|
||||
}
|
||||
self.current_offset += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finish writing pages.
|
||||
///
|
||||
/// This consumes the PagesWriter and completes the snapshot.
|
||||
//
|
||||
pub fn finish(self) -> Result<SnapFileMeta> {
|
||||
let book = self.writer.close()?;
|
||||
|
||||
// Write out a page index and close the book. This will write out any
|
||||
// necessary file metadata.
|
||||
// FIXME: these 3 lines could be combined into a single function
|
||||
// that means "serialize this data structure with this format into this chapter".
|
||||
let writer = book.new_chapter(versioned::CHAPTER_PAGE_INDEX);
|
||||
let mut sink = CborData::new(writer);
|
||||
sink.write_message(&self.page_index)?;
|
||||
|
||||
// Close the chapter, then close the book.
|
||||
sink.into_inner().close()?.close()?;
|
||||
|
||||
// Return the snapshot metadata to the caller.
|
||||
Ok(self.meta)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
|
||||
const TEST_TIMELINE: [u8; 16] = [99u8; 16];
|
||||
|
||||
#[test]
|
||||
fn snap_two_pages() {
|
||||
// When `dir` goes out of scope the directory will be unlinked.
|
||||
let dir = TempDir::new().unwrap();
|
||||
let snap_meta = {
|
||||
// Write out a new snapshot file with two pages.
|
||||
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
|
||||
let mut snap = SnapWriter::new(dir.path(), meta).unwrap();
|
||||
// Write the pages out of order, because why not?
|
||||
let page99 = [99u8; 8192];
|
||||
snap.write_page(99, page99).unwrap();
|
||||
let page33 = [33u8; 8192];
|
||||
snap.write_page(33, page33).unwrap();
|
||||
snap.finish().unwrap()
|
||||
};
|
||||
|
||||
assert_eq!(snap_meta.lsn, 1234);
|
||||
|
||||
{
|
||||
// Read the snapshot file and verify the contents.
|
||||
let mut path = PathBuf::from(dir.path());
|
||||
path.push(snap_meta.to_filename());
|
||||
let mut snap = SnapFile::new(&path).unwrap();
|
||||
|
||||
assert_eq!(snap.page_count(), 2);
|
||||
assert!(!snap.has_page(0));
|
||||
assert!(snap.has_page(33));
|
||||
assert!(!snap.has_page(98));
|
||||
assert!(snap.has_page(99));
|
||||
assert!(snap.read_page(0).unwrap().is_none());
|
||||
let page = snap.read_page(33).unwrap().unwrap();
|
||||
assert_eq!(*page.0, [33u8; 8192]);
|
||||
let page = snap.read_page(99).unwrap().unwrap();
|
||||
assert_eq!(*page.0, [99u8; 8192]);
|
||||
|
||||
// Make sure the deserialized metadata matches what we think we wrote.
|
||||
let meta2 = snap.read_meta().unwrap();
|
||||
assert_eq!(snap_meta, meta2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snap_zero_pages() {
|
||||
// When `dir` goes out of scope the directory will be unlinked.
|
||||
let dir = TempDir::new().unwrap();
|
||||
let snap_meta = {
|
||||
// Write out a new snapshot file with no pages.
|
||||
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
|
||||
let snap = SnapWriter::new(dir.path(), meta).unwrap();
|
||||
snap.finish().unwrap()
|
||||
};
|
||||
|
||||
{
|
||||
// Read the snapshot file.
|
||||
let mut path = PathBuf::from(dir.path());
|
||||
path.push(snap_meta.to_filename());
|
||||
let snap = SnapFile::new(&path).unwrap();
|
||||
assert_eq!(snap.page_index.page_count(), 0);
|
||||
assert!(!snap.has_page(0));
|
||||
assert!(!snap.has_page(99));
|
||||
assert!(snap.read_page(0).unwrap().is_none());
|
||||
assert!(snap.read_page(99).unwrap().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
42
snapfile/src/page.rs
Normal file
42
snapfile/src/page.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
/// A single 8KB page.
|
||||
pub struct Page(pub Box<[u8; 8192]>);
|
||||
|
||||
impl Page {
|
||||
/// Create a page by copying bytes from another slice.
|
||||
///
|
||||
/// This is a copy, not a move. If the caller already has
|
||||
/// an owned array then `From<[u8; 8192]>` can be used instead.
|
||||
pub fn copy_slice(x: &[u8; 8192]) -> Self {
|
||||
Page(Box::new(x.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Page {
|
||||
fn default() -> Self {
|
||||
Page(Box::new([0u8; 8192]))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<[u8; 8192]> for Page {
|
||||
fn from(array: [u8; 8192]) -> Self {
|
||||
Page(Box::new(array))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Box<[u8; 8192]>> for Page {
|
||||
fn from(heap_array: Box<[u8; 8192]>) -> Self {
|
||||
Page(heap_array)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; 8192]> for Page {
|
||||
fn as_ref(&self) -> &[u8; 8192] {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; 8192]> for Page {
|
||||
fn as_mut(&mut self) -> &mut [u8; 8192] {
|
||||
self.0.as_mut()
|
||||
}
|
||||
}
|
||||
100
snapfile/src/squash.rs
Normal file
100
snapfile/src/squash.rs
Normal file
@@ -0,0 +1,100 @@
|
||||
use crate::{Page, PageIter, SnapFile, SnapFileMeta, SnapWriter};
|
||||
use anyhow::{bail, Result};
|
||||
use std::cmp::Ordering;
|
||||
use std::path::Path;
|
||||
|
||||
// A helper struct that holds an iterator, along with the last
|
||||
// value taken from the iterator.
|
||||
struct PageStepper<'a> {
|
||||
it: PageIter<'a>,
|
||||
pub cache: Option<(u64, Page)>,
|
||||
}
|
||||
|
||||
impl<'a> PageStepper<'a> {
|
||||
fn new(snapfile: &'a SnapFile) -> Result<Self> {
|
||||
let mut it = snapfile.all_pages();
|
||||
let cache = it.next().transpose()?;
|
||||
Ok(PageStepper { it, cache })
|
||||
}
|
||||
|
||||
/// Read a new page from the iterator, returning the previous page.
|
||||
fn step(&mut self) -> Result<Option<(u64, Page)>> {
|
||||
let mut next = self.it.next().transpose()?;
|
||||
std::mem::swap(&mut self.cache, &mut next);
|
||||
Ok(next)
|
||||
}
|
||||
}
|
||||
|
||||
/// Squash two snapshot files into one.
|
||||
///
|
||||
/// The resulting snapshot will contain all of the pages from both files.
|
||||
/// If the same page number is stored in both, it will keep the page from
|
||||
/// the newer snapshot.
|
||||
///
|
||||
/// The name of the resulting file will be automatically generated from
|
||||
/// the snapshot metadata.
|
||||
pub fn squash(older: &Path, newer: &Path, out_dir: &Path) -> Result<()> {
|
||||
let mut snap1 = SnapFile::new(older)?;
|
||||
let mut snap2 = SnapFile::new(newer)?;
|
||||
|
||||
let meta1 = snap1.read_meta()?;
|
||||
let meta2 = snap2.read_meta()?;
|
||||
|
||||
// Check that snap1 is the predecessor of snap2.
|
||||
match meta2.predecessor {
|
||||
Some(pred) if pred.timeline == meta1.timeline => {}
|
||||
_ => {
|
||||
bail!(
|
||||
"snap file {:?} is not the predecessor of {:?}",
|
||||
&older,
|
||||
&newer,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// The new combined snapshot will have most fields from meta2 (the later
|
||||
// snapshot), but will have the predecessor from meta1.
|
||||
let new_meta = SnapFileMeta {
|
||||
// There is some danger in squashing snapshots across two timelines,
|
||||
// in that it's possible to get confused about what the history
|
||||
// looks like. Ultimately, it should be possible to squash our way
|
||||
// to a "complete" snapshot (that contains all pages), so this must
|
||||
// be possible.
|
||||
timeline: meta2.timeline,
|
||||
predecessor: meta1.predecessor,
|
||||
lsn: meta2.lsn,
|
||||
};
|
||||
let mut snap_writer = SnapWriter::new(&out_dir, new_meta)?;
|
||||
|
||||
let mut iter1 = PageStepper::new(&snap1)?;
|
||||
let mut iter2 = PageStepper::new(&snap2)?;
|
||||
|
||||
loop {
|
||||
let next_page = match (&iter1.cache, &iter2.cache) {
|
||||
(None, None) => break,
|
||||
(Some(_), None) => iter1.step()?,
|
||||
(None, Some(_)) => iter2.step()?,
|
||||
(Some(x), Some(y)) => {
|
||||
// If these are two different page numbers, then advance the iterator
|
||||
// with the numerically lower number.
|
||||
// If they are the same page number, then store the one from the newer
|
||||
// snapshot, and discard the other (advancing both iterators).
|
||||
match x.0.cmp(&y.0) {
|
||||
Ordering::Less => iter1.step()?,
|
||||
Ordering::Greater => iter2.step()?,
|
||||
Ordering::Equal => {
|
||||
let _ = iter1.step()?;
|
||||
iter2.step()?
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
// This can't be None, because we would already checked inside the match
|
||||
// statement.
|
||||
let (page_num, page_data) = next_page.unwrap();
|
||||
snap_writer.write_page(page_num, page_data)?;
|
||||
}
|
||||
|
||||
snap_writer.finish()?;
|
||||
Ok(())
|
||||
}
|
||||
88
snapfile/src/versioned.rs
Normal file
88
snapfile/src/versioned.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
//! Versioned data structures for snapshot files
|
||||
//!
|
||||
//! To ensure that future versions of software can read snapshot files,
|
||||
//! all data structures that are serialized into the snapshot files should
|
||||
//! live in this module.
|
||||
//!
|
||||
//! Once released, versioned data structures should never be modified.
|
||||
//! Instead, new versions should be created and conversion functions should
|
||||
//! be defined using the `FromVersion` trait.
|
||||
|
||||
use aversion::{assign_message_ids, UpgradeLatest, Versioned};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
// A random constant, to identify this file type.
|
||||
pub(crate) const SNAPFILE_MAGIC: u32 = 0x7fb8_38a8;
|
||||
|
||||
// Constant chapter numbers
|
||||
// FIXME: the bookfile crate should use something better to index, e.g. strings.
|
||||
/// Snapshot-specific file metadata
|
||||
pub(crate) const CHAPTER_SNAP_META: u64 = 1;
|
||||
/// A packed set of 8KB pages.
|
||||
pub(crate) const CHAPTER_PAGES: u64 = 2;
|
||||
/// An index of pages.
|
||||
pub(crate) const CHAPTER_PAGE_INDEX: u64 = 3;
|
||||
|
||||
/// Information about the predecessor snapshot.
|
||||
///
|
||||
/// It contains the snap_id of the predecessor snapshot, and the LSN
|
||||
/// of that snapshot.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Predecessor {
|
||||
/// This is the ID number of the predecessor timeline.
|
||||
///
|
||||
/// This may match the current snapshot's timeline id, but
|
||||
/// it may not (if the precessor was the branch point).
|
||||
pub timeline: [u8; 16],
|
||||
|
||||
/// This is the LSN of the predecessor snapshot.
|
||||
pub lsn: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize, Versioned, UpgradeLatest)]
|
||||
pub struct SnapFileMetaV1 {
|
||||
/// This is a unique ID number for this timeline.
|
||||
///
|
||||
/// This number guarantees that snapshot history is unique.
|
||||
pub timeline: [u8; 16],
|
||||
|
||||
/// Information about the predecessor snapshot.
|
||||
///
|
||||
/// If `None`, this snapshot is the start of a new database.
|
||||
pub predecessor: Option<Predecessor>,
|
||||
|
||||
/// This is the last LSN stored in this snapshot.
|
||||
pub lsn: u64,
|
||||
}
|
||||
|
||||
/// A type alias for the latest version of `SnapFileMeta`.
|
||||
pub type SnapFileMeta = SnapFileMetaV1;
|
||||
|
||||
/// A page location within a file.
|
||||
///
|
||||
/// Note: this is an opaque value that may not be the true byte offset;
|
||||
/// it may be relative to some other location or measured in units other
|
||||
/// than bytes.
|
||||
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct PageLocationV1(pub(crate) u64);
|
||||
|
||||
/// A type alias for the latest version of `PageLocation`.
|
||||
pub type PageLocation = PageLocationV1;
|
||||
|
||||
/// An index from page number to offset within the pages chapter.
|
||||
#[derive(Debug, Default, Serialize, Deserialize, Versioned, UpgradeLatest)]
|
||||
pub struct PageIndexV1 {
|
||||
/// A map from page number to file offset.
|
||||
pub(crate) map: BTreeMap<u64, PageLocationV1>,
|
||||
}
|
||||
|
||||
/// A type alias for the latest version of `PageIndex`.
|
||||
pub type PageIndex = PageIndexV1;
|
||||
|
||||
// Each message gets a unique message id, for tracking by the aversion traits.
|
||||
assign_message_ids! {
|
||||
PageIndex: 100,
|
||||
SnapFileMeta: 101,
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
import pytest
|
||||
import random
|
||||
import time
|
||||
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process, Value
|
||||
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
|
||||
# times, with fault_probability chance of getting a wal acceptor down or up
|
||||
# along the way. 2 of 3 are always alive, so the work keeps going.
|
||||
def test_pageserver_restart(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory: WalAcceptorFactory):
|
||||
|
||||
# One safekeeper is enough for this test.
|
||||
wa_factory.start_n_new(1)
|
||||
|
||||
zenith_cli.run(["branch", "test_pageserver_restart", "empty"])
|
||||
pg = postgres.create_start('test_pageserver_restart',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create table, and insert some rows. Make it big enough that it doesn't fit in
|
||||
# shared_buffers, otherwise the SELECT after restart will just return answer
|
||||
# from shared_buffers without hitting the page server, which defeats the point
|
||||
# of this test.
|
||||
cur.execute('CREATE TABLE foo (t text)')
|
||||
cur.execute('''
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
''')
|
||||
|
||||
# Verify that the table is larger than shared_buffers
|
||||
cur.execute('''
|
||||
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
|
||||
from pg_settings where name = 'shared_buffers'
|
||||
''')
|
||||
row = cur.fetchone()
|
||||
print("shared_buffers is {}, table size {}", row[0], row[1]);
|
||||
assert int(row[0]) < int(row[1])
|
||||
|
||||
# Stop and restart pageserver. This is a more or less graceful shutdown, although
|
||||
# the page server doesn't currently have a shutdown routine so there's no difference
|
||||
# between stopping and crashing.
|
||||
pageserver.stop();
|
||||
pageserver.start();
|
||||
|
||||
# Stopping the pageserver breaks the connection from the postgres backend to
|
||||
# the page server, and causes the next query on the connection to fail. Start a new
|
||||
# postgres connection too, to avoid that error. (Ideally, the compute node would
|
||||
# handle that and retry internally, without propagating the error to the user, but
|
||||
# currently it doesn't...)
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000, )
|
||||
|
||||
# Stop the page server by force, and restart it
|
||||
pageserver.stop();
|
||||
pageserver.start();
|
||||
|
||||
@@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
#
|
||||
# Test restarting and recreating a postgres instance
|
||||
#
|
||||
@pytest.mark.parametrize('with_wal_acceptors', [True, False])
|
||||
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
|
||||
def test_restart_compute(
|
||||
zenith_cli,
|
||||
pageserver: ZenithPageserver,
|
||||
@@ -31,56 +31,31 @@ def test_restart_compute(
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
print("res = ", r)
|
||||
# Create table, and insert a row
|
||||
cur.execute('CREATE TABLE foo (t text)')
|
||||
cur.execute("INSERT INTO foo VALUES ('bar')")
|
||||
|
||||
# Remove data directory and restart
|
||||
# Stop and restart the Postgres instance
|
||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||
wal_acceptors=wal_acceptor_connstrs)
|
||||
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the row
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
print("res = ", r)
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (1, )
|
||||
|
||||
# Insert another row
|
||||
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
cur.execute("INSERT INTO foo VALUES ('bar2')")
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
print("res = ", r)
|
||||
|
||||
# Again remove data directory and restart
|
||||
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||
wal_acceptors=wal_acceptor_connstrs)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
print("res = ", r)
|
||||
|
||||
# And again remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||
wal_acceptors=wal_acceptor_connstrs)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
print("res = ", r)
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import os
|
||||
|
||||
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
|
||||
|
||||
|
||||
@@ -30,59 +28,24 @@ def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFa
|
||||
cur.execute("INSERT INTO foo VALUES ('two')")
|
||||
cur.execute("PREPARE TRANSACTION 'insert_two'")
|
||||
|
||||
# Prepare a transaction that will insert a row
|
||||
cur.execute('BEGIN')
|
||||
cur.execute("INSERT INTO foo VALUES ('three')")
|
||||
cur.execute("PREPARE TRANSACTION 'insert_three'")
|
||||
|
||||
# Prepare another transaction that will insert a row
|
||||
cur.execute('BEGIN')
|
||||
cur.execute("INSERT INTO foo VALUES ('four')")
|
||||
cur.execute("PREPARE TRANSACTION 'insert_four'")
|
||||
|
||||
# On checkpoint state data copied to files in
|
||||
# pg_twophase directory and fsynced
|
||||
cur.execute('CHECKPOINT')
|
||||
|
||||
twophase_files = os.listdir(pg.pg_twophase_dir_path())
|
||||
print(twophase_files)
|
||||
assert len(twophase_files) == 4
|
||||
|
||||
cur.execute("COMMIT PREPARED 'insert_three'")
|
||||
cur.execute("ROLLBACK PREPARED 'insert_four'")
|
||||
cur.execute('CHECKPOINT')
|
||||
|
||||
twophase_files = os.listdir(pg.pg_twophase_dir_path())
|
||||
print(twophase_files)
|
||||
assert len(twophase_files) == 2
|
||||
|
||||
# Create a branch with the transaction in prepared state
|
||||
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"])
|
||||
|
||||
# Create compute node, but don't start.
|
||||
# We want to observe pgdata before postgres starts
|
||||
pg2 = postgres.create(
|
||||
pg2 = postgres.create_start(
|
||||
'test_twophase_prepared',
|
||||
config_lines=['max_prepared_transactions=5'],
|
||||
)
|
||||
|
||||
# Check that we restored only needed twophase files
|
||||
twophase_files2 = os.listdir(pg2.pg_twophase_dir_path())
|
||||
print(twophase_files2)
|
||||
assert twophase_files2.sort() == twophase_files.sort()
|
||||
|
||||
pg2 = pg2.start()
|
||||
conn2 = pg2.connect()
|
||||
cur2 = conn2.cursor()
|
||||
|
||||
# On the new branch, commit one of the prepared transactions,
|
||||
# abort the other one.
|
||||
# On the new branch, commit one of the prepared transactions, abort the other one.
|
||||
cur2.execute("COMMIT PREPARED 'insert_one'")
|
||||
cur2.execute("ROLLBACK PREPARED 'insert_two'")
|
||||
|
||||
cur2.execute('SELECT * FROM foo')
|
||||
assert cur2.fetchall() == [('one',), ('three',)]
|
||||
assert cur2.fetchall() == [('one', )]
|
||||
|
||||
# Only one committed insert is visible on the original branch
|
||||
# Neither insert is visible on the original branch, the transactions are still
|
||||
# in prepared state there.
|
||||
cur.execute('SELECT * FROM foo')
|
||||
assert cur.fetchall() == [('three',)]
|
||||
assert cur.fetchall() == []
|
||||
|
||||
@@ -249,68 +249,10 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
|
||||
print('Starting pageserver cleanup')
|
||||
ps.stop()
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
def __init__(self, log_dir: str, pg_distrib_dir: str):
|
||||
self.log_dir = log_dir
|
||||
self.pg_install_path = pg_distrib_dir
|
||||
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
|
||||
self.env = os.environ.copy()
|
||||
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
|
||||
|
||||
def _fixpath(self, command: List[str]) -> None:
|
||||
if '/' not in command[0]:
|
||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||
|
||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||
if env_add is None:
|
||||
return self.env
|
||||
env = self.env.copy()
|
||||
env.update(env_add)
|
||||
return env
|
||||
|
||||
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries.
|
||||
|
||||
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||
|
||||
All the necessary environment variables will be set.
|
||||
|
||||
If the first argument (the command name) doesn't include a path (no '/'
|
||||
characters present), then it will be edited to include the correct path.
|
||||
|
||||
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
def run_capture(self,
|
||||
command: List[str],
|
||||
env: Optional[Env] = None,
|
||||
cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||
|
||||
This is just like `run`, but for chatty programs.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
|
||||
|
||||
|
||||
@zenfixture
|
||||
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_distrib_dir)
|
||||
|
||||
class Postgres(PgProtocol):
|
||||
""" An object representing a running postgres daemon. """
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int):
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
|
||||
super().__init__(host='localhost', port=port)
|
||||
|
||||
self.zenith_cli = zenith_cli
|
||||
@@ -318,7 +260,6 @@ class Postgres(PgProtocol):
|
||||
self.repo_dir = repo_dir
|
||||
self.branch: Optional[str] = None # dubious, see asserts below
|
||||
self.tenant_id = tenant_id
|
||||
self.pg_bin = pg_bin
|
||||
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
|
||||
|
||||
def create(
|
||||
@@ -358,32 +299,20 @@ class Postgres(PgProtocol):
|
||||
"""
|
||||
|
||||
assert self.branch is not None
|
||||
|
||||
print(f"Starting postgres on brach {self.branch}")
|
||||
|
||||
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
|
||||
self.running = True
|
||||
|
||||
self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()])
|
||||
|
||||
return self
|
||||
|
||||
def pg_data_dir_path(self) -> str:
|
||||
""" Path to data directory """
|
||||
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
|
||||
return os.path.join(self.repo_dir, path)
|
||||
|
||||
def pg_xact_dir_path(self) -> str:
|
||||
""" Path to pg_xact dir """
|
||||
return os.path.join(self.pg_data_dir_path(), 'pg_xact')
|
||||
|
||||
def pg_twophase_dir_path(self) -> str:
|
||||
""" Path to pg_twophase dir """
|
||||
return os.path.join(self.pg_data_dir_path(), 'pg_twophase')
|
||||
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact'
|
||||
return os.path.join(self.repo_dir, path)
|
||||
|
||||
def config_file_path(self) -> str:
|
||||
""" Path to postgresql.conf """
|
||||
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
|
||||
filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf'
|
||||
return os.path.join(self.repo_dir, filename)
|
||||
|
||||
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
|
||||
"""
|
||||
@@ -475,14 +404,13 @@ class Postgres(PgProtocol):
|
||||
|
||||
class PostgresFactory:
|
||||
""" An object representing multiple running postgres daemons. """
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431):
|
||||
self.zenith_cli = zenith_cli
|
||||
self.repo_dir = repo_dir
|
||||
self.num_instances = 0
|
||||
self.instances: List[Postgres] = []
|
||||
self.initial_tenant: str = initial_tenant
|
||||
self.base_port = base_port
|
||||
self.pg_bin = pg_bin
|
||||
|
||||
def create_start(
|
||||
self,
|
||||
@@ -495,7 +423,6 @@ class PostgresFactory:
|
||||
pg = Postgres(
|
||||
zenith_cli=self.zenith_cli,
|
||||
repo_dir=self.repo_dir,
|
||||
pg_bin=self.pg_bin,
|
||||
tenant_id=tenant_id or self.initial_tenant,
|
||||
port=self.base_port + self.num_instances + 1,
|
||||
)
|
||||
@@ -569,8 +496,8 @@ def initial_tenant(pageserver: ZenithPageserver):
|
||||
|
||||
|
||||
@zenfixture
|
||||
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
|
||||
pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
|
||||
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]:
|
||||
pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant)
|
||||
|
||||
yield pgfactory
|
||||
|
||||
@@ -578,6 +505,67 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin:
|
||||
print('Starting postgres cleanup')
|
||||
pgfactory.stop_all()
|
||||
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
def __init__(self, log_dir: str, pg_distrib_dir: str):
|
||||
self.log_dir = log_dir
|
||||
self.pg_install_path = pg_distrib_dir
|
||||
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
|
||||
self.env = os.environ.copy()
|
||||
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
|
||||
|
||||
def _fixpath(self, command: List[str]) -> None:
|
||||
if '/' not in command[0]:
|
||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||
|
||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||
if env_add is None:
|
||||
return self.env
|
||||
env = self.env.copy()
|
||||
env.update(env_add)
|
||||
return env
|
||||
|
||||
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries.
|
||||
|
||||
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||
|
||||
All the necessary environment variables will be set.
|
||||
|
||||
If the first argument (the command name) doesn't include a path (no '/'
|
||||
characters present), then it will be edited to include the correct path.
|
||||
|
||||
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
def run_capture(self,
|
||||
command: List[str],
|
||||
env: Optional[Env] = None,
|
||||
cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||
|
||||
This is just like `run`, but for chatty programs.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
|
||||
|
||||
|
||||
@zenfixture
|
||||
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_distrib_dir)
|
||||
|
||||
|
||||
def read_pid(path: Path):
|
||||
""" Read content of file into number """
|
||||
return int(path.read_text())
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 9932d259be...e3175fe60a
@@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! FIXME: better description needed here
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use anyhow::{bail, Result};
|
||||
use bincode::config::Options;
|
||||
use bytes::{Buf, Bytes};
|
||||
use log::*;
|
||||
@@ -27,7 +27,6 @@ use crate::replication::HotStandbyFeedback;
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::WalAcceptorConf;
|
||||
use pageserver::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
@@ -237,9 +236,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
// Receive information about server
|
||||
let server_info = self
|
||||
.read_msg::<ServerInfo>()
|
||||
.context("Failed to receive server info")?;
|
||||
let server_info = self.read_msg::<ServerInfo>()?;
|
||||
info!(
|
||||
"Start handshake with wal_proposer {} sysid {} timeline {} tenant {}",
|
||||
self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id,
|
||||
@@ -287,9 +284,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
self.write_msg(&my_info)?;
|
||||
|
||||
/* Wait for vote request */
|
||||
let prop = self
|
||||
.read_msg::<RequestVote>()
|
||||
.context("Failed to read vote request")?;
|
||||
let prop = self.read_msg::<RequestVote>()?;
|
||||
/* This is Paxos check which should ensure that only one master can perform commits */
|
||||
if prop.node_id < my_info.server.node_id {
|
||||
/* Send my node-id to inform proposer that it's candidate was rejected */
|
||||
@@ -301,8 +296,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
);
|
||||
}
|
||||
my_info.server.node_id = prop.node_id;
|
||||
/* Need to persist our vote first */
|
||||
this_timeline.get().set_info(&my_info);
|
||||
/* Need to persist our vote first */
|
||||
this_timeline.get().save_control_file(true)?;
|
||||
|
||||
let mut flushed_restart_lsn = Lsn(0);
|
||||
@@ -323,11 +318,9 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
info!(
|
||||
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
|
||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn,
|
||||
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}",
|
||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
|
||||
);
|
||||
let mut last_rec_lsn = Lsn(0);
|
||||
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
|
||||
|
||||
// Main loop
|
||||
loop {
|
||||
@@ -337,8 +330,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
let msg_bytes = self.read_msg_bytes()?;
|
||||
let mut msg_reader = msg_bytes.reader();
|
||||
|
||||
let req = SafeKeeperRequest::des_from(&mut msg_reader)
|
||||
.context("Failed to get WAL message header")?;
|
||||
let req = SafeKeeperRequest::des_from(&mut msg_reader)?;
|
||||
if req.sender_id != my_info.server.node_id {
|
||||
bail!("Sender NodeId is changed");
|
||||
}
|
||||
@@ -350,52 +342,27 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
let end_pos = req.end_lsn;
|
||||
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
assert!(rec_size <= MAX_SEND_SIZE);
|
||||
if rec_size != 0 {
|
||||
debug!(
|
||||
"received for {} bytes between {} and {}",
|
||||
rec_size, start_pos, end_pos,
|
||||
);
|
||||
|
||||
/* Receive message body (from the rest of the message) */
|
||||
let mut buf = Vec::with_capacity(rec_size);
|
||||
msg_reader.read_to_end(&mut buf)?;
|
||||
assert_eq!(buf.len(), rec_size);
|
||||
debug!(
|
||||
"received for {} bytes between {} and {}",
|
||||
rec_size, start_pos, end_pos,
|
||||
);
|
||||
|
||||
if decoder.available() != start_pos {
|
||||
info!(
|
||||
"Restart decoder from {} to {}",
|
||||
decoder.available(),
|
||||
start_pos
|
||||
);
|
||||
decoder = WalStreamDecoder::new(start_pos, false);
|
||||
}
|
||||
decoder.feed_bytes(&buf);
|
||||
loop {
|
||||
match decoder.poll_decode() {
|
||||
Err(e) => info!("Decode error {}", e),
|
||||
Ok(None) => {},
|
||||
Ok(Some((lsn, _rec))) => {
|
||||
last_rec_lsn = lsn;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
info!(
|
||||
"Receive WAL {}..{} last_rec_lsn={}",
|
||||
start_pos, end_pos, last_rec_lsn
|
||||
);
|
||||
/* Receive message body (from the rest of the message) */
|
||||
let mut buf = Vec::with_capacity(rec_size);
|
||||
msg_reader.read_to_end(&mut buf)?;
|
||||
assert_eq!(buf.len(), rec_size);
|
||||
|
||||
/* Save message in file */
|
||||
Self::write_wal_file(
|
||||
swh,
|
||||
start_pos,
|
||||
timeline_id,
|
||||
this_timeline.get(),
|
||||
wal_seg_size,
|
||||
&buf,
|
||||
)?;
|
||||
|
||||
/* Save message in file */
|
||||
Self::write_wal_file(
|
||||
swh,
|
||||
start_pos,
|
||||
timeline_id,
|
||||
this_timeline.get(),
|
||||
wal_seg_size,
|
||||
&buf,
|
||||
)?;
|
||||
}
|
||||
my_info.restart_lsn = req.restart_lsn;
|
||||
my_info.commit_lsn = req.commit_lsn;
|
||||
|
||||
@@ -405,13 +372,13 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
* maximum (vcl) determined by WAL proposer during handshake.
|
||||
* Switching epoch means that node completes recovery and start writing in the WAL new data.
|
||||
*/
|
||||
if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) {
|
||||
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {
|
||||
info!("Switch to new epoch {}", prop.epoch);
|
||||
my_info.epoch = prop.epoch; /* bump epoch */
|
||||
sync_control_file = true;
|
||||
}
|
||||
if last_rec_lsn > my_info.flush_lsn {
|
||||
my_info.flush_lsn = last_rec_lsn;
|
||||
if end_pos > my_info.flush_lsn {
|
||||
my_info.flush_lsn = end_pos;
|
||||
}
|
||||
/*
|
||||
* Update restart LSN in control file.
|
||||
@@ -419,7 +386,6 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
* when restart_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
||||
this_timeline.get().set_info(&my_info);
|
||||
this_timeline.get().save_control_file(sync_control_file)?;
|
||||
|
||||
if sync_control_file {
|
||||
@@ -430,7 +396,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
|
||||
let resp = SafeKeeperResponse {
|
||||
epoch: my_info.epoch,
|
||||
flush_lsn: my_info.flush_lsn,
|
||||
flush_lsn: end_pos,
|
||||
hs_feedback: this_timeline.get().get_hs_feedback(),
|
||||
};
|
||||
self.write_msg(&resp)?;
|
||||
@@ -439,15 +405,9 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
* Ping wal sender that new data is available.
|
||||
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
||||
*/
|
||||
trace!(
|
||||
"Notify WAL senders min({}, {})={}",
|
||||
req.commit_lsn,
|
||||
my_info.flush_lsn,
|
||||
min(req.commit_lsn, my_info.flush_lsn)
|
||||
);
|
||||
this_timeline
|
||||
.get()
|
||||
.notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn));
|
||||
.notify_wal_senders(min(req.commit_lsn, end_pos));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -76,10 +76,8 @@ impl ReplicationConn {
|
||||
let feedback = HotStandbyFeedback::des(&m)?;
|
||||
subscriber.add_hs_feedback(feedback);
|
||||
}
|
||||
FeMessage::Sync => {}
|
||||
FeMessage::CopyFailed => return Err(anyhow!("Copy failed")),
|
||||
_ => {
|
||||
// We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
|
||||
// We only handle `CopyData` messages. Anything else is ignored.
|
||||
info!("unexpected message {:?}", msg);
|
||||
}
|
||||
}
|
||||
@@ -217,15 +215,10 @@ impl ReplicationConn {
|
||||
data: &file_buf,
|
||||
}))?;
|
||||
|
||||
debug!(
|
||||
"Sent WAL to page server {}..{}, end_pos={}",
|
||||
start_pos,
|
||||
start_pos + send_size as u64,
|
||||
end_pos
|
||||
);
|
||||
|
||||
start_pos += send_size as u64;
|
||||
|
||||
debug!("Sent WAL to page server up to {}", end_pos);
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
if start_pos.segment_offset(wal_seg_size) != 0 {
|
||||
|
||||
@@ -175,7 +175,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop_wal_senders(&self) {
|
||||
fn _stop_wal_senders(&self) {
|
||||
self.notify_wal_senders(END_REPLICATION_MARKER);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,11 +24,6 @@ impl Lsn {
|
||||
/// Maximum possible value for an LSN
|
||||
pub const MAX: Lsn = Lsn(u64::MAX);
|
||||
|
||||
/// Align LSN on 8-byte boundary (alignment of WAL records).
|
||||
pub fn align(&self) -> Lsn {
|
||||
Lsn((self.0 + 7) & !7)
|
||||
}
|
||||
|
||||
/// Subtract a number, returning None on overflow.
|
||||
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
|
||||
let other: u64 = other.into();
|
||||
|
||||
@@ -301,9 +301,8 @@ impl PostgresBackend {
|
||||
FeMessage::Query(m) => {
|
||||
trace!("got query {:?}", m.body);
|
||||
// xxx distinguish fatal and recoverable errors?
|
||||
if let Err(e) = handler.process_query(self, m.body.clone()) {
|
||||
if let Err(e) = handler.process_query(self, m.body) {
|
||||
let errmsg = format!("{}", e);
|
||||
warn!("query handler for {:?} failed: {}", m.body, errmsg);
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?;
|
||||
}
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
@@ -341,7 +340,7 @@ impl PostgresBackend {
|
||||
|
||||
// We prefer explicit pattern matching to wildcards, because
|
||||
// this helps us spot the places where new variants are missing
|
||||
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
|
||||
FeMessage::CopyData(_) | FeMessage::CopyDone => {
|
||||
bail!("unexpected message type: {:?}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,6 @@ pub enum FeMessage {
|
||||
Terminate,
|
||||
CopyData(Bytes),
|
||||
CopyDone,
|
||||
CopyFailed,
|
||||
PasswordMessage(Bytes),
|
||||
}
|
||||
|
||||
@@ -139,7 +138,6 @@ impl FeMessage {
|
||||
b'X' => Ok(Some(FeMessage::Terminate)),
|
||||
b'd' => Ok(Some(FeMessage::CopyData(body))),
|
||||
b'c' => Ok(Some(FeMessage::CopyDone)),
|
||||
b'f' => Ok(Some(FeMessage::CopyFailed)),
|
||||
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
|
||||
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
|
||||
}
|
||||
@@ -340,7 +338,6 @@ pub enum BeMessage<'a> {
|
||||
ControlFile,
|
||||
CopyData(&'a [u8]),
|
||||
CopyDone,
|
||||
CopyFailed,
|
||||
CopyInResponse,
|
||||
CopyOutResponse,
|
||||
CopyBothResponse,
|
||||
@@ -549,11 +546,6 @@ impl<'a> BeMessage<'a> {
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
BeMessage::CopyFailed => {
|
||||
buf.put_u8(b'f');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
BeMessage::CopyInResponse => {
|
||||
buf.put_u8(b'G');
|
||||
write_body(buf, |buf| {
|
||||
|
||||
Reference in New Issue
Block a user