mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 08:50:38 +00:00
Compare commits
20 Commits
snapfile
...
safe_flush
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a8a2f62bc3 | ||
|
|
26060dd68e | ||
|
|
73d823e53c | ||
|
|
112909c5e4 | ||
|
|
07adc9dbda | ||
|
|
c05cedc626 | ||
|
|
815528e0ce | ||
|
|
a2e135b404 | ||
|
|
72de70a8cc | ||
|
|
4051c5d4ff | ||
|
|
f86bf26466 | ||
|
|
3ca4b638ac | ||
|
|
d61699b0f8 | ||
|
|
ead94feb05 | ||
|
|
39bb6fb19c | ||
|
|
82725725fd | ||
|
|
1c3d51ed92 | ||
|
|
04a309f562 | ||
|
|
20e6cd7724 | ||
|
|
9fed5c8fb7 |
@@ -237,6 +237,23 @@ jobs:
|
||||
- store_test_results:
|
||||
path: /tmp/test_output
|
||||
|
||||
# Build zenithdb/zenith:latest image and push it to Docker hub
|
||||
docker-image:
|
||||
docker:
|
||||
- image: cimg/base:2021.04
|
||||
steps:
|
||||
- checkout
|
||||
- setup_remote_docker:
|
||||
docker_layer_caching: true
|
||||
- run:
|
||||
name: Init postgres submodule
|
||||
command: git submodule update --init --depth 1
|
||||
- run:
|
||||
name: Build and push Docker image
|
||||
command: |
|
||||
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
docker build -t zenithdb/zenith:latest . && docker push zenithdb/zenith:latest
|
||||
|
||||
workflows:
|
||||
build_and_test:
|
||||
jobs:
|
||||
@@ -265,3 +282,14 @@ workflows:
|
||||
test_selection: batch_others
|
||||
requires:
|
||||
- build-zenith-<< matrix.build_type >>
|
||||
- docker-image:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
# Build image only for commits to main
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- main
|
||||
requires:
|
||||
- pg_regress tests release
|
||||
- other tests release
|
||||
|
||||
83
Dockerfile
83
Dockerfile
@@ -1,94 +1,77 @@
|
||||
#
|
||||
# Docker image for console integration testing.
|
||||
#
|
||||
# We may also reuse it in CI to unify installation process and as a general binaries building
|
||||
# tool for production servers.
|
||||
#
|
||||
# Dynamic linking is used for librocksdb and libstdc++ bacause librocksdb-sys calls
|
||||
# bindgen with "dynamic" feature flag. This also prevents usage of dockerhub alpine-rust
|
||||
# images which are statically linked and have guards against any dlopen. I would rather
|
||||
# prefer all static binaries so we may change the way librocksdb-sys builds or wait until
|
||||
# we will have our own storage and drop rockdb dependency.
|
||||
#
|
||||
# Cargo-chef is used to separate dependencies building from main binaries building. This
|
||||
# way `docker build` will download and install dependencies only of there are changes to
|
||||
# out Cargo.toml files.
|
||||
#
|
||||
|
||||
|
||||
#
|
||||
# build postgres separately -- this layer will be rebuilt only if one of
|
||||
# mentioned paths will get any changes
|
||||
# Build Postgres separately --- this layer will be rebuilt only if one of
|
||||
# mentioned paths will get any changes.
|
||||
#
|
||||
FROM alpine:3.13 as pg-build
|
||||
RUN apk add --update clang llvm compiler-rt compiler-rt-static lld musl-dev binutils \
|
||||
make bison flex readline-dev zlib-dev perl linux-headers libseccomp-dev
|
||||
WORKDIR zenith
|
||||
FROM zenithdb/build:buster AS pg-build
|
||||
WORKDIR /zenith
|
||||
COPY ./vendor/postgres vendor/postgres
|
||||
COPY ./Makefile Makefile
|
||||
# Build using clang and lld
|
||||
RUN CC='clang' LD='lld' CFLAGS='-fuse-ld=lld --rtlib=compiler-rt' make postgres -j4
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) -s postgres
|
||||
|
||||
#
|
||||
# Calculate cargo dependencies.
|
||||
# This will always run, but only generate recipe.json with list of dependencies without
|
||||
# installing them.
|
||||
#
|
||||
FROM alpine:20210212 as cargo-deps-inspect
|
||||
RUN apk add --update rust cargo
|
||||
RUN cargo install cargo-chef
|
||||
WORKDIR zenith
|
||||
FROM zenithdb/build:buster AS cargo-deps-inspect
|
||||
WORKDIR /zenith
|
||||
COPY . .
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
RUN cargo chef prepare --recipe-path /zenith/recipe.json
|
||||
|
||||
#
|
||||
# Build cargo dependencies.
|
||||
# This temp cantainner would be build only if recipe.json was changed.
|
||||
# This temp cantainner should be rebuilt only if recipe.json was changed.
|
||||
#
|
||||
FROM alpine:20210212 as deps-build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
# rust-rocksdb can be built against system-wide rocksdb -- that saves about
|
||||
# 10 minutes during build. Rocksdb apk package is in testing now, but use it
|
||||
# anyway. In case of any troubles we can download and build rocksdb here manually
|
||||
# (to cache it as a docker layer).
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
FROM zenithdb/build:buster AS deps-build
|
||||
WORKDIR /zenith
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=cargo-deps-inspect /root/.cargo/bin/cargo-chef /root/.cargo/bin/
|
||||
COPY --from=cargo-deps-inspect /usr/local/cargo/bin/cargo-chef /usr/local/cargo/bin/
|
||||
COPY --from=cargo-deps-inspect /zenith/recipe.json recipe.json
|
||||
RUN ROCKSDB_LIB_DIR=/usr/lib/ cargo chef cook --release --recipe-path recipe.json
|
||||
|
||||
#
|
||||
# Build zenith binaries
|
||||
#
|
||||
FROM alpine:20210212 as build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
FROM zenithdb/build:buster AS build
|
||||
WORKDIR /zenith
|
||||
COPY . .
|
||||
# Copy cached dependencies
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=deps-build /zenith/target target
|
||||
COPY --from=deps-build /root/.cargo /root/.cargo
|
||||
COPY --from=deps-build /usr/local/cargo/ /usr/local/cargo/
|
||||
RUN cargo build --release
|
||||
|
||||
#
|
||||
# Copy binaries to resulting image.
|
||||
# build-base hare to provide libstdc++ (it will also bring gcc, but leave it this way until we figure
|
||||
# out how to statically link rocksdb or avoid it at all).
|
||||
#
|
||||
FROM alpine:3.13
|
||||
RUN apk add --update openssl build-base libseccomp-dev
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb
|
||||
FROM debian:buster-slim
|
||||
WORKDIR /data
|
||||
|
||||
RUN apt-get update && apt-get -yq install librocksdb-dev libseccomp-dev openssl && \
|
||||
mkdir zenith_install
|
||||
|
||||
COPY --from=build /zenith/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/wal_acceptor /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/proxy /usr/local/bin
|
||||
COPY --from=pg-build /zenith/tmp_install /usr/local
|
||||
COPY --from=pg-build /zenith/tmp_install postgres_install
|
||||
COPY docker-entrypoint.sh /docker-entrypoint.sh
|
||||
|
||||
RUN addgroup zenith && adduser -h /data -D -G zenith zenith
|
||||
# Remove build artifacts (~ 500 MB)
|
||||
RUN rm -rf postgres_install/build && \
|
||||
# 'Install' Postgres binaries locally
|
||||
cp -r postgres_install/* /usr/local/ && \
|
||||
# Prepare an archive of Postgres binaries (should be around 11 MB)
|
||||
# and keep it inside container for an ease of deploy pipeline.
|
||||
cd postgres_install && tar -czf /data/postgres_install.tar.gz . && cd .. && \
|
||||
rm -rf postgres_install
|
||||
|
||||
RUN useradd -m -d /data zenith
|
||||
|
||||
VOLUME ["/data"]
|
||||
WORKDIR /data
|
||||
USER zenith
|
||||
EXPOSE 6400
|
||||
ENTRYPOINT ["/docker-entrypoint.sh"]
|
||||
|
||||
95
Dockerfile.alpine
Normal file
95
Dockerfile.alpine
Normal file
@@ -0,0 +1,95 @@
|
||||
#
|
||||
# Docker image for console integration testing.
|
||||
#
|
||||
# We may also reuse it in CI to unify installation process and as a general binaries building
|
||||
# tool for production servers.
|
||||
#
|
||||
# Dynamic linking is used for librocksdb and libstdc++ bacause librocksdb-sys calls
|
||||
# bindgen with "dynamic" feature flag. This also prevents usage of dockerhub alpine-rust
|
||||
# images which are statically linked and have guards against any dlopen. I would rather
|
||||
# prefer all static binaries so we may change the way librocksdb-sys builds or wait until
|
||||
# we will have our own storage and drop rockdb dependency.
|
||||
#
|
||||
# Cargo-chef is used to separate dependencies building from main binaries building. This
|
||||
# way `docker build` will download and install dependencies only of there are changes to
|
||||
# out Cargo.toml files.
|
||||
#
|
||||
|
||||
|
||||
#
|
||||
# build postgres separately -- this layer will be rebuilt only if one of
|
||||
# mentioned paths will get any changes
|
||||
#
|
||||
FROM alpine:3.13 as pg-build
|
||||
RUN apk add --update clang llvm compiler-rt compiler-rt-static lld musl-dev binutils \
|
||||
make bison flex readline-dev zlib-dev perl linux-headers libseccomp-dev
|
||||
WORKDIR zenith
|
||||
COPY ./vendor/postgres vendor/postgres
|
||||
COPY ./Makefile Makefile
|
||||
# Build using clang and lld
|
||||
RUN CC='clang' LD='lld' CFLAGS='-fuse-ld=lld --rtlib=compiler-rt' make postgres -j4
|
||||
|
||||
#
|
||||
# Calculate cargo dependencies.
|
||||
# This will always run, but only generate recipe.json with list of dependencies without
|
||||
# installing them.
|
||||
#
|
||||
FROM alpine:20210212 as cargo-deps-inspect
|
||||
RUN apk add --update rust cargo
|
||||
RUN cargo install cargo-chef
|
||||
WORKDIR zenith
|
||||
COPY . .
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
#
|
||||
# Build cargo dependencies.
|
||||
# This temp cantainner would be build only if recipe.json was changed.
|
||||
#
|
||||
FROM alpine:20210212 as deps-build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
# rust-rocksdb can be built against system-wide rocksdb -- that saves about
|
||||
# 10 minutes during build. Rocksdb apk package is in testing now, but use it
|
||||
# anyway. In case of any troubles we can download and build rocksdb here manually
|
||||
# (to cache it as a docker layer).
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=cargo-deps-inspect /root/.cargo/bin/cargo-chef /root/.cargo/bin/
|
||||
COPY --from=cargo-deps-inspect /zenith/recipe.json recipe.json
|
||||
RUN ROCKSDB_LIB_DIR=/usr/lib/ cargo chef cook --release --recipe-path recipe.json
|
||||
|
||||
#
|
||||
# Build zenith binaries
|
||||
#
|
||||
FROM alpine:20210212 as build
|
||||
RUN apk add --update rust cargo openssl-dev clang build-base
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
|
||||
WORKDIR zenith
|
||||
COPY . .
|
||||
# Copy cached dependencies
|
||||
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
|
||||
COPY --from=deps-build /zenith/target target
|
||||
COPY --from=deps-build /root/.cargo /root/.cargo
|
||||
RUN cargo build --release
|
||||
|
||||
#
|
||||
# Copy binaries to resulting image.
|
||||
# build-base hare to provide libstdc++ (it will also bring gcc, but leave it this way until we figure
|
||||
# out how to statically link rocksdb or avoid it at all).
|
||||
#
|
||||
FROM alpine:3.13
|
||||
RUN apk add --update openssl build-base libseccomp-dev
|
||||
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb
|
||||
COPY --from=build /zenith/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/wal_acceptor /usr/local/bin
|
||||
COPY --from=build /zenith/target/release/proxy /usr/local/bin
|
||||
COPY --from=pg-build /zenith/tmp_install /usr/local
|
||||
COPY docker-entrypoint.sh /docker-entrypoint.sh
|
||||
|
||||
RUN addgroup zenith && adduser -h /data -D -G zenith zenith
|
||||
VOLUME ["/data"]
|
||||
WORKDIR /data
|
||||
USER zenith
|
||||
EXPOSE 6400
|
||||
ENTRYPOINT ["/docker-entrypoint.sh"]
|
||||
CMD ["pageserver"]
|
||||
15
Dockerfile.build
Normal file
15
Dockerfile.build
Normal file
@@ -0,0 +1,15 @@
|
||||
#
|
||||
# Image with all the required dependencies to build https://github.com/zenithdb/zenith
|
||||
# and Postgres from https://github.com/zenithdb/postgres
|
||||
# Also includes some rust development and build tools.
|
||||
#
|
||||
FROM rust:slim-buster
|
||||
WORKDIR /zenith
|
||||
|
||||
# Install postgres and zenith build dependencies
|
||||
# clang is for rocksdb
|
||||
RUN apt-get update && apt-get -yq install automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
|
||||
libseccomp-dev pkg-config libssl-dev librocksdb-dev clang
|
||||
|
||||
# Install rust tools
|
||||
RUN rustup component add clippy && cargo install cargo-chef cargo-audit
|
||||
12
README.md
12
README.md
@@ -12,15 +12,12 @@ apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libsec
|
||||
libssl-dev clang
|
||||
```
|
||||
|
||||
[Rust] 1.48 or later is also required.
|
||||
[Rust] 1.52 or later is also required.
|
||||
|
||||
To run the `psql` client, install the `postgresql-client` package or modify `PATH` and `LD_LIBRARY_PATH` to include `tmp_install/bin` and `tmp_install/lib`, respectively.
|
||||
|
||||
To run the integration tests (not required to use the code), install
|
||||
Python (3.6 or higher), and install python3 packages with `pip` (called `pip3` on some systems):
|
||||
```
|
||||
pip install pytest psycopg2
|
||||
```
|
||||
Python (3.6 or higher), and install python3 packages with `pipenv` using `pipenv install` in the project directory.
|
||||
|
||||
2. Build zenith and patched postgres
|
||||
```sh
|
||||
@@ -106,10 +103,9 @@ pytest
|
||||
|
||||
## Documentation
|
||||
|
||||
Now we use README files to cover design ideas and overall architecture for each module.
|
||||
And rustdoc style documentation comments.
|
||||
Now we use README files to cover design ideas and overall architecture for each module and `rustdoc` style documentation comments. See also [/docs/](/docs/) a top-level overview of all available markdown documentation.
|
||||
|
||||
To view your documentation in a browser, try running `cargo doc --no-deps --open`
|
||||
To view your `rustdoc` documentation in a browser, try running `cargo doc --no-deps --open`
|
||||
|
||||
## Source tree layout
|
||||
|
||||
|
||||
@@ -320,7 +320,7 @@ impl PostgresNode {
|
||||
|
||||
// Never clean up old WAL. TODO: We should use a replication
|
||||
// slot or something proper, to prevent the compute node
|
||||
// from removing WAL that hasn't been streamed to the safekeepr or
|
||||
// from removing WAL that hasn't been streamed to the safekeeper or
|
||||
// page server yet. (gh issue #349)
|
||||
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
if [ "$1" = 'pageserver' ]; then
|
||||
if [ ! -d "/data/timelines" ]; then
|
||||
if [ ! -d "/data/tenants" ]; then
|
||||
echo "Initializing pageserver data directory"
|
||||
pageserver --init -D /data --postgres-distrib /usr/local
|
||||
fi
|
||||
|
||||
11
docs/README.md
Normal file
11
docs/README.md
Normal file
@@ -0,0 +1,11 @@
|
||||
# Zenith documentation
|
||||
|
||||
## Table of contents
|
||||
|
||||
- [authentication.md](authentication.md) — pageserver JWT authentication.
|
||||
- [docker.md](docker.md) — Docker images and building pipeline.
|
||||
- [multitenancy.md](multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
|
||||
- [pageserver/README](/pageserver/README) — pageserver overview.
|
||||
- [postgres_ffi/README](/postgres_ffi/README) — Postgres FFI overview.
|
||||
- [test_runner/README.md](/test_runner/README.md) — tests infrastructure overview.
|
||||
- [walkeeper/README](/walkeeper/README.md) — WAL service overview.
|
||||
38
docs/docker.md
Normal file
38
docs/docker.md
Normal file
@@ -0,0 +1,38 @@
|
||||
# Docker images of Zenith
|
||||
|
||||
## Images
|
||||
|
||||
Currently we build two main images:
|
||||
|
||||
- [zenithdb/zenith](https://hub.docker.com/repository/docker/zenithdb/zenith) — image with pre-built `pageserver`, `wal_acceptor` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
|
||||
- [zenithdb/compute-node](https://hub.docker.com/repository/docker/zenithdb/compute-node) — compute node image with pre-built Postgres binaries from [zenithdb/postgres](https://github.com/zenithdb/postgres).
|
||||
|
||||
And two intermediate images used either to reduce build time or to deliver some additional binary tools from other repos:
|
||||
|
||||
- [zenithdb/build](https://hub.docker.com/repository/docker/zenithdb/build) — image with all the dependencies required to build Zenith and compute node images. This image is based on `rust:slim-buster`, so it also has a proper `rust` environment. Built from [/Dockerfile.build](/Dockerfile.build).
|
||||
- [zenithdb/compute-tools](https://hub.docker.com/repository/docker/zenithdb/compute-tools) — compute node configuration management tools.
|
||||
|
||||
## Building pipeline
|
||||
|
||||
1. Image `zenithdb/compute-tools` is re-built automatically.
|
||||
|
||||
2. Image `zenithdb/build` is built manually. If you want to introduce any new compile time dependencies to Zenith or compute node you have to update this image as well, build it and push to Docker Hub.
|
||||
|
||||
Build:
|
||||
```sh
|
||||
docker build -t zenithdb/build:buster -f Dockerfile.build .
|
||||
```
|
||||
|
||||
Login:
|
||||
```sh
|
||||
docker login
|
||||
```
|
||||
|
||||
Push to Docker Hub:
|
||||
```sh
|
||||
docker push zenithdb/build:buster
|
||||
```
|
||||
|
||||
3. Image `zenithdb/compute-node` is built independently in the [zenithdb/postgres](https://github.com/zenithdb/postgres) repo.
|
||||
|
||||
4. Image `zenithdb/zenith` is built in this repo after a successful `release` tests run and pushed to Docker Hub automatically.
|
||||
@@ -842,6 +842,34 @@ impl Timeline for LayeredTimeline {
|
||||
fn get_prev_record_lsn(&self) -> Lsn {
|
||||
self.prev_record_lsn.load()
|
||||
}
|
||||
|
||||
///
|
||||
/// Wait until WAL has been received up to the given LSN.
|
||||
///
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
}
|
||||
|
||||
impl LayeredTimeline {
|
||||
@@ -1055,34 +1083,6 @@ impl LayeredTimeline {
|
||||
Ok(layer_rc)
|
||||
}
|
||||
|
||||
///
|
||||
/// Wait until WAL has been received up to the given LSN.
|
||||
///
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
///
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
|
||||
@@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline {
|
||||
assert!(old <= lsn);
|
||||
|
||||
// Use old value of last_record_lsn as prev_record_lsn
|
||||
self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7));
|
||||
self.prev_record_lsn.fetch_max(old.align());
|
||||
|
||||
// Also advance last_valid_lsn
|
||||
let old = self.last_valid_lsn.advance(lsn);
|
||||
@@ -712,6 +712,41 @@ impl Timeline for ObjectTimeline {
|
||||
let iter = self.obj_store.objects(self.timelineid, lsn)?;
|
||||
Ok(Box::new(ObjectHistory { lsn, iter }))
|
||||
}
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, req_lsn: Lsn) -> Result<Lsn> {
|
||||
let mut lsn = req_lsn;
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
trace!(
|
||||
"Start waiting for LSN {}, valid LSN is {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load()
|
||||
);
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load(),
|
||||
)
|
||||
})?;
|
||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectTimeline {
|
||||
@@ -820,40 +855,6 @@ impl ObjectTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
trace!(
|
||||
"Start waiting for LSN {}, valid LSN is {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load()
|
||||
);
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
|
||||
lsn,
|
||||
self.last_valid_lsn.load(),
|
||||
)
|
||||
})?;
|
||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
///
|
||||
/// Iterate through object versions with given key, in reverse LSN order.
|
||||
///
|
||||
|
||||
@@ -357,8 +357,13 @@ impl PageServerHandler {
|
||||
|
||||
/* Send a tarball of the latest snapshot on the timeline */
|
||||
|
||||
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
|
||||
|
||||
let req_lsn = match lsn {
|
||||
Some(lsn) => {
|
||||
timeline.wait_lsn(lsn)?;
|
||||
lsn
|
||||
}
|
||||
None => timeline.get_last_record_lsn(),
|
||||
};
|
||||
{
|
||||
let mut writer = CopyDataSink { pgb };
|
||||
let mut basebackup = basebackup::Basebackup::new(
|
||||
@@ -469,7 +474,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
let (_, params_raw) = query_string.split_at("basebackup ".len());
|
||||
let params = params_raw.split(" ").collect::<Vec<_>>();
|
||||
ensure!(
|
||||
params.len() == 2,
|
||||
params.len() >= 2,
|
||||
"invalid param number for basebackup command"
|
||||
);
|
||||
|
||||
@@ -479,7 +484,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
self.check_permission(Some(tenantid))?;
|
||||
|
||||
// TODO are there any tests with lsn option?
|
||||
let lsn = if params.len() == 3 {
|
||||
let lsn = if params.len() == 3 && params[2].len() != 0 {
|
||||
Some(Lsn::from_str(params[2])?)
|
||||
} else {
|
||||
None
|
||||
@@ -575,6 +580,10 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
timeline.advance_last_valid_lsn(last_lsn);
|
||||
break;
|
||||
}
|
||||
FeMessage::CopyFailed => {
|
||||
info!("Copy failed");
|
||||
break;
|
||||
}
|
||||
FeMessage::Sync => {}
|
||||
_ => bail!("unexpected message {:?}", msg),
|
||||
}
|
||||
|
||||
@@ -203,6 +203,11 @@ pub trait Timeline: Send + Sync {
|
||||
/// Relation size is increased implicitly and decreased with Truncate updates.
|
||||
// TODO ordering guarantee?
|
||||
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
|
||||
}
|
||||
|
||||
pub trait History: Iterator<Item = Result<Modification>> {
|
||||
|
||||
@@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
|
||||
/// Scan PostgreSQL WAL files in given directory
|
||||
/// and load all records >= 'startpoint' into the repository.
|
||||
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||
|
||||
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||
@@ -425,9 +425,11 @@ pub fn save_decoded_record(
|
||||
let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
||||
save_xact_record(timeline, lsn, &parsed_xact, decoded)?;
|
||||
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
|
||||
info!(
|
||||
trace!(
|
||||
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
|
||||
decoded.xl_xid, parsed_xact.xid, lsn
|
||||
decoded.xl_xid,
|
||||
parsed_xact.xid,
|
||||
lsn
|
||||
);
|
||||
timeline.put_unlink(
|
||||
RelishTag::TwoPhase {
|
||||
|
||||
@@ -25,13 +25,13 @@ pub type MultiXactStatus = u32;
|
||||
pub struct WalStreamDecoder {
|
||||
lsn: Lsn,
|
||||
|
||||
startlsn: Lsn, // LSN where this record starts
|
||||
contlen: u32,
|
||||
padlen: u32,
|
||||
|
||||
inputbuf: BytesMut,
|
||||
|
||||
recordbuf: BytesMut,
|
||||
|
||||
crc_check: bool,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
@@ -46,19 +46,24 @@ pub struct WalDecodeError {
|
||||
// FIXME: This isn't a proper rust stream
|
||||
//
|
||||
impl WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn) -> WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder {
|
||||
WalStreamDecoder {
|
||||
lsn,
|
||||
|
||||
startlsn: Lsn(0),
|
||||
contlen: 0,
|
||||
padlen: 0,
|
||||
|
||||
inputbuf: BytesMut::new(),
|
||||
recordbuf: BytesMut::new(),
|
||||
|
||||
crc_check,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn available(&self) -> Lsn {
|
||||
self.lsn + self.inputbuf.remaining() as u64
|
||||
}
|
||||
|
||||
pub fn feed_bytes(&mut self, buf: &[u8]) {
|
||||
self.inputbuf.extend_from_slice(buf);
|
||||
}
|
||||
@@ -92,7 +97,9 @@ impl WalStreamDecoder {
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
||||
continue;
|
||||
if !self.crc_check && self.contlen != hdr.std.xlp_rem_len {
|
||||
self.contlen = hdr.std.xlp_rem_len; // skip continuation record
|
||||
}
|
||||
} else if self.lsn.block_offset() == 0 {
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
||||
return Ok(None);
|
||||
@@ -102,14 +109,19 @@ impl WalStreamDecoder {
|
||||
|
||||
if hdr.xlp_pageaddr != self.lsn.0 {
|
||||
return Err(WalDecodeError {
|
||||
msg: "invalid xlog page header".into(),
|
||||
msg: format!(
|
||||
"invalid xlog page header: xlp_pageaddr={} vs. lsn={}",
|
||||
hdr.xlp_pageaddr, self.lsn
|
||||
),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||
continue;
|
||||
if !self.crc_check && self.contlen != hdr.xlp_rem_len {
|
||||
self.contlen = hdr.xlp_rem_len; // skip continuation record
|
||||
}
|
||||
} else if self.padlen > 0 {
|
||||
if self.inputbuf.remaining() < self.padlen as usize {
|
||||
return Ok(None);
|
||||
@@ -127,7 +139,6 @@ impl WalStreamDecoder {
|
||||
}
|
||||
|
||||
// read xl_tot_len FIXME: assumes little-endian
|
||||
self.startlsn = self.lsn;
|
||||
let xl_tot_len = self.inputbuf.get_u32_le();
|
||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
return Err(WalDecodeError {
|
||||
@@ -142,7 +153,6 @@ impl WalStreamDecoder {
|
||||
self.recordbuf.put_u32_le(xl_tot_len);
|
||||
|
||||
self.contlen = xl_tot_len - 4;
|
||||
continue;
|
||||
} else {
|
||||
// we're continuing a record, possibly from previous page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
@@ -164,17 +174,10 @@ impl WalStreamDecoder {
|
||||
let recordbuf = recordbuf.freeze();
|
||||
let mut buf = recordbuf.clone();
|
||||
|
||||
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
||||
|
||||
// XLOG_SWITCH records are special. If we see one, we need to skip
|
||||
// to the next WAL segment.
|
||||
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
||||
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
return Err(WalDecodeError {
|
||||
msg: "WAL record crc mismatch".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
if xlogrec.is_xlog_switch_record() {
|
||||
trace!("saw xlog switch record at {}", self.lsn);
|
||||
self.padlen =
|
||||
@@ -184,10 +187,29 @@ impl WalStreamDecoder {
|
||||
self.padlen = self.lsn.calc_padding(8u32) as u32;
|
||||
}
|
||||
|
||||
let result = (self.lsn, recordbuf);
|
||||
// Check record CRC
|
||||
if self.crc_check {
|
||||
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
|
||||
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
|
||||
return Err(WalDecodeError {
|
||||
msg: format!(
|
||||
"WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}",
|
||||
n,
|
||||
buf.len(),
|
||||
self.lsn,
|
||||
xlogrec
|
||||
),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let result = (self.lsn.align(), recordbuf);
|
||||
return Ok(Some(result));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// check record boundaries
|
||||
|
||||
@@ -22,8 +22,6 @@ use postgres_types::PgLsn;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Mutex;
|
||||
use std::thread;
|
||||
@@ -178,7 +176,7 @@ fn walreceiver_main(
|
||||
let copy_stream = rclient.copy_both_simple(&query)?;
|
||||
let mut physical_stream = ReplicationIter::new(copy_stream);
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||
|
||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
@@ -194,45 +192,51 @@ fn walreceiver_main(
|
||||
let endlsn = startlsn + data.len() as u64;
|
||||
let prev_last_rec_lsn = last_rec_lsn;
|
||||
|
||||
write_wal_file(
|
||||
conf,
|
||||
startlsn,
|
||||
&timelineid,
|
||||
pg_constants::WAL_SEGMENT_SIZE,
|
||||
data,
|
||||
tenantid,
|
||||
)?;
|
||||
|
||||
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
||||
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// Save old checkpoint value to compare with it after decoding WAL record
|
||||
let old_checkpoint_bytes = checkpoint.encode();
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
restore_local_repo::save_decoded_record(
|
||||
&mut checkpoint,
|
||||
&*timeline,
|
||||
&decoded,
|
||||
recdata,
|
||||
lsn,
|
||||
)?;
|
||||
last_rec_lsn = lsn;
|
||||
loop {
|
||||
match waldecoder.poll_decode() {
|
||||
Ok(Some((lsn, recdata))) => {
|
||||
// Save old checkpoint value to compare with it after decoding WAL record
|
||||
let old_checkpoint_bytes = checkpoint.encode();
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
restore_local_repo::save_decoded_record(
|
||||
&mut checkpoint,
|
||||
&*timeline,
|
||||
&decoded,
|
||||
recdata,
|
||||
lsn,
|
||||
)?;
|
||||
last_rec_lsn = lsn;
|
||||
|
||||
let new_checkpoint_bytes = checkpoint.encode();
|
||||
// Check if checkpoint data was updated by save_decoded_record
|
||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||
timeline.put_page_image(
|
||||
RelishTag::Checkpoint,
|
||||
0,
|
||||
lsn,
|
||||
new_checkpoint_bytes,
|
||||
false,
|
||||
)?;
|
||||
let new_checkpoint_bytes = checkpoint.encode();
|
||||
// Check if checkpoint data was updated by save_decoded_record
|
||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||
timeline.put_page_image(
|
||||
RelishTag::Checkpoint,
|
||||
0,
|
||||
lsn,
|
||||
new_checkpoint_bytes,
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
trace!(
|
||||
"End of replication stream {}..{} at {}",
|
||||
startlsn,
|
||||
endlsn,
|
||||
last_rec_lsn
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Decode error {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the last_valid LSN value in the page cache one more time. We updated
|
||||
// it in the loop above, between each WAL record, but we might have received
|
||||
// a partial record after the last completed record. Our page cache's value
|
||||
@@ -407,98 +411,3 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
|
||||
Err(IdentifyError.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn write_wal_file(
|
||||
conf: &PageServerConf,
|
||||
startpos: Lsn,
|
||||
timelineid: &ZTimelineId,
|
||||
wal_seg_size: usize,
|
||||
buf: &[u8],
|
||||
tenantid: &ZTenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut bytes_left: usize = buf.len();
|
||||
let mut bytes_written: usize = 0;
|
||||
let mut partial;
|
||||
let mut start_pos = startpos;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
|
||||
|
||||
while bytes_left != 0 {
|
||||
let bytes_to_write;
|
||||
|
||||
/*
|
||||
* If crossing a WAL boundary, only write up until we reach wal
|
||||
* segment size.
|
||||
*/
|
||||
if xlogoff + bytes_left > wal_seg_size {
|
||||
bytes_to_write = wal_seg_size - xlogoff;
|
||||
} else {
|
||||
bytes_to_write = bytes_left;
|
||||
}
|
||||
|
||||
/* Open file */
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
let wal_file_name = XLogFileName(
|
||||
1, // FIXME: always use Postgres timeline 1
|
||||
segno,
|
||||
wal_seg_size,
|
||||
);
|
||||
let wal_file_path = wal_dir.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
|
||||
|
||||
{
|
||||
let mut wal_file: File;
|
||||
/* Try to open already completed segment */
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
wal_file = file;
|
||||
partial = false;
|
||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
|
||||
/* Try to open existed partial file */
|
||||
wal_file = file;
|
||||
partial = true;
|
||||
} else {
|
||||
/* Create and fill new partial file */
|
||||
partial = true;
|
||||
match OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
|
||||
file.write_all(&ZERO_BLOCK)?;
|
||||
}
|
||||
wal_file = file;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
|
||||
|
||||
// FIXME: Flush the file
|
||||
//wal_file.sync_all()?;
|
||||
}
|
||||
/* Write was successful, advance our position */
|
||||
bytes_written += bytes_to_write;
|
||||
bytes_left -= bytes_to_write;
|
||||
start_pos += bytes_to_write as u64;
|
||||
xlogoff += bytes_to_write;
|
||||
|
||||
/* Did we reach the end of a WAL segment? */
|
||||
if start_pos.segment_offset(wal_seg_size) == 0 {
|
||||
xlogoff = 0;
|
||||
if partial {
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -108,17 +108,23 @@ fn find_end_of_wal_segment(
|
||||
segno: XLogSegNo,
|
||||
tli: TimeLineID,
|
||||
wal_seg_size: usize,
|
||||
is_partial: bool,
|
||||
rec_offs: &mut usize,
|
||||
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
|
||||
crc: &mut u32,
|
||||
check_contrec: bool,
|
||||
) -> u32 {
|
||||
let mut offs: usize = 0;
|
||||
let mut contlen: usize = 0;
|
||||
let mut wal_crc: u32 = 0;
|
||||
let mut crc: u32 = 0;
|
||||
let mut rec_offs: usize = 0;
|
||||
let mut buf = [0u8; XLOG_BLCKSZ];
|
||||
let file_name = XLogFileName(tli, segno, wal_seg_size);
|
||||
let mut last_valid_rec_pos: usize = 0;
|
||||
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
|
||||
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
|
||||
let file_path = data_dir.join(if is_partial {
|
||||
file_name.clone() + ".partial"
|
||||
} else {
|
||||
file_name
|
||||
});
|
||||
let mut file = File::open(&file_path).unwrap();
|
||||
|
||||
while offs < wal_seg_size {
|
||||
if offs % XLOG_BLCKSZ == 0 {
|
||||
@@ -133,13 +139,33 @@ fn find_end_of_wal_segment(
|
||||
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
|
||||
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
|
||||
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
||||
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
|
||||
info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic);
|
||||
break;
|
||||
}
|
||||
if offs == 0 {
|
||||
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
|
||||
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
|
||||
offs += ((xlp_rem_len + 7) & !7) as usize;
|
||||
if check_contrec {
|
||||
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
|
||||
contlen = xlp_rem_len as usize;
|
||||
if *rec_offs + contlen < xl_tot_len
|
||||
|| (*rec_offs + contlen != xl_tot_len
|
||||
&& contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD)
|
||||
{
|
||||
info!(
|
||||
"Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}",
|
||||
*rec_offs, contlen, xl_tot_len
|
||||
);
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
offs += ((xlp_rem_len + 7) & !7) as usize;
|
||||
}
|
||||
} else if *rec_offs != 0 {
|
||||
// There is incompleted page at previous segment but no cont record:
|
||||
// it means that current segment is not valid and we have to return back.
|
||||
info!("CONTRECORD flag is missed in page header");
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
|
||||
@@ -150,9 +176,8 @@ fn find_end_of_wal_segment(
|
||||
if xl_tot_len == 0 {
|
||||
break;
|
||||
}
|
||||
last_valid_rec_pos = offs;
|
||||
offs += 4;
|
||||
rec_offs = 4;
|
||||
*rec_offs = 4;
|
||||
contlen = xl_tot_len - 4;
|
||||
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
|
||||
} else {
|
||||
@@ -162,34 +187,33 @@ fn find_end_of_wal_segment(
|
||||
|
||||
// read the rest of the record, or as much as fits on this page.
|
||||
let n = min(contlen, pageleft);
|
||||
if rec_offs < XLOG_RECORD_CRC_OFFS {
|
||||
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
|
||||
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
|
||||
let mut hdr_len: usize = 0;
|
||||
if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
// copy header
|
||||
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
|
||||
rec_hdr[*rec_offs..*rec_offs + hdr_len]
|
||||
.copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
|
||||
}
|
||||
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
|
||||
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
|
||||
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
|
||||
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
|
||||
crc = !crc;
|
||||
} else {
|
||||
crc ^= 0xFFFFFFFFu32;
|
||||
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
|
||||
crc = !crc;
|
||||
}
|
||||
rec_offs += n;
|
||||
*crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]);
|
||||
*rec_offs += n;
|
||||
offs += n;
|
||||
contlen -= n;
|
||||
|
||||
if contlen == 0 {
|
||||
crc = !crc;
|
||||
crc = crc32c_append(crc, &rec_hdr);
|
||||
*crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
|
||||
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
|
||||
if crc == wal_crc {
|
||||
let wal_crc = LittleEndian::read_u32(
|
||||
&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4],
|
||||
);
|
||||
if *crc == wal_crc {
|
||||
last_valid_rec_pos = offs;
|
||||
// Reset rec_offs and crc for start of new record
|
||||
*rec_offs = 0;
|
||||
*crc = 0;
|
||||
} else {
|
||||
info!(
|
||||
"CRC mismatch {} vs {} at {}",
|
||||
crc, wal_crc, last_valid_rec_pos
|
||||
"CRC mismatch {} vs {} at offset {} lsn {}",
|
||||
*crc, wal_crc, offs, last_valid_rec_pos
|
||||
);
|
||||
break;
|
||||
}
|
||||
@@ -240,20 +264,142 @@ pub fn find_end_of_wal(
|
||||
}
|
||||
if high_segno > 0 {
|
||||
let mut high_offs = 0;
|
||||
/*
|
||||
* Move the starting pointer to the start of the next segment, if the
|
||||
* highest one we saw was completed.
|
||||
*/
|
||||
if !high_ispartial {
|
||||
high_segno += 1;
|
||||
} else if precise {
|
||||
/* otherwise locate last record in last partial segment */
|
||||
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size);
|
||||
if precise {
|
||||
let mut crc: u32 = 0;
|
||||
let mut rec_offs: usize = 0;
|
||||
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
|
||||
let wal_dir = data_dir.join("pg_wal");
|
||||
|
||||
/*
|
||||
* To be able to calculate CRC of records crossing segment boundary,
|
||||
* we need to parse previous segments.
|
||||
* So first traverse segments in backward direction to locate record start
|
||||
* and then traverse forward, accumulating CRC.
|
||||
*/
|
||||
let mut prev_segno = high_segno - 1;
|
||||
let mut prev_offs: u32 = 0;
|
||||
while prev_segno > 1 {
|
||||
// TOFO: first segment constains dummy checkpoint record at the beginning
|
||||
prev_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
prev_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
false,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
false,
|
||||
);
|
||||
if prev_offs != 0 {
|
||||
break;
|
||||
}
|
||||
prev_segno -= 1;
|
||||
}
|
||||
if prev_offs != 0 {
|
||||
// found start of WAL record
|
||||
let first_segno = prev_segno;
|
||||
let first_offs = prev_offs;
|
||||
while prev_segno + 1 < high_segno {
|
||||
// now traverse record in forward direction, accumulating CRC
|
||||
prev_segno += 1;
|
||||
prev_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
prev_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
false,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
true,
|
||||
);
|
||||
if prev_offs == 0 {
|
||||
info!("Segment {} is corrupted", prev_segno,);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if prev_offs != 0 {
|
||||
high_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
high_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
high_ispartial,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
true,
|
||||
);
|
||||
}
|
||||
if high_offs == 0 {
|
||||
// If last segment contais no valid records, then return back
|
||||
info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment",
|
||||
high_segno, first_segno);
|
||||
// Remove last segments containing corrupted WAL record
|
||||
for segno in first_segno + 1..high_segno {
|
||||
let file_name = XLogFileName(high_tli, segno, wal_seg_size);
|
||||
let file_path = wal_dir.join(file_name);
|
||||
if let Err(e) = fs::remove_file(&file_path) {
|
||||
info!("Failed to remove file {:?}: {}", &file_path, e);
|
||||
}
|
||||
}
|
||||
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
|
||||
let file_path = if high_ispartial {
|
||||
wal_dir.join(file_name.clone() + ".partial")
|
||||
} else {
|
||||
wal_dir.join(file_name.clone())
|
||||
};
|
||||
if let Err(e) = fs::remove_file(&file_path) {
|
||||
info!("Failed to remove file {:?}: {}", &file_path, e);
|
||||
}
|
||||
high_ispartial = false; // previous segment should not be partial
|
||||
high_segno = first_segno;
|
||||
high_offs = first_offs;
|
||||
}
|
||||
} else {
|
||||
// failed to locate previous segment
|
||||
assert!(prev_segno <= 1);
|
||||
high_offs = find_end_of_wal_segment(
|
||||
data_dir,
|
||||
high_segno,
|
||||
high_tli,
|
||||
wal_seg_size,
|
||||
high_ispartial,
|
||||
&mut rec_offs,
|
||||
&mut rec_hdr,
|
||||
&mut crc,
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
// If last segment is not marked as partial, it means that next segment
|
||||
// was not written. Let's make this segment partial once again.
|
||||
if !high_ispartial {
|
||||
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
|
||||
if let Err(e) = fs::rename(
|
||||
wal_dir.join(file_name.clone()),
|
||||
wal_dir.join(file_name.clone() + ".partial"),
|
||||
) {
|
||||
info!(
|
||||
"Failed to rename {} to {}.partial: {}",
|
||||
&file_name, &file_name, e
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
* Move the starting pointer to the start of the next segment, if the
|
||||
* highest one we saw was completed.
|
||||
*/
|
||||
if !high_ispartial {
|
||||
high_segno += 1;
|
||||
}
|
||||
}
|
||||
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
|
||||
return (high_ptr, high_tli);
|
||||
}
|
||||
(0, 0)
|
||||
(0, 1) // First timeline is 1
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
@@ -469,7 +615,7 @@ mod tests {
|
||||
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true);
|
||||
let wal_end = Lsn(wal_end);
|
||||
println!("wal_end={}, tli={}", wal_end, tli);
|
||||
assert_eq!(wal_end, "0/2000000".parse::<Lsn>().unwrap());
|
||||
assert_eq!(wal_end, "0/1699D10".parse::<Lsn>().unwrap());
|
||||
|
||||
// 4. Get the actual end of WAL by pg_waldump
|
||||
let waldump_path = top_path.join("tmp_install/bin/pg_waldump");
|
||||
|
||||
66
test_runner/batch_others/test_pageserver_restart.py
Normal file
66
test_runner/batch_others/test_pageserver_restart.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import pytest
|
||||
import random
|
||||
import time
|
||||
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process, Value
|
||||
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
|
||||
# times, with fault_probability chance of getting a wal acceptor down or up
|
||||
# along the way. 2 of 3 are always alive, so the work keeps going.
|
||||
def test_pageserver_restart(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory: WalAcceptorFactory):
|
||||
|
||||
# One safekeeper is enough for this test.
|
||||
wa_factory.start_n_new(1)
|
||||
|
||||
zenith_cli.run(["branch", "test_pageserver_restart", "empty"])
|
||||
pg = postgres.create_start('test_pageserver_restart',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create table, and insert some rows. Make it big enough that it doesn't fit in
|
||||
# shared_buffers, otherwise the SELECT after restart will just return answer
|
||||
# from shared_buffers without hitting the page server, which defeats the point
|
||||
# of this test.
|
||||
cur.execute('CREATE TABLE foo (t text)')
|
||||
cur.execute('''
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
''')
|
||||
|
||||
# Verify that the table is larger than shared_buffers
|
||||
cur.execute('''
|
||||
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
|
||||
from pg_settings where name = 'shared_buffers'
|
||||
''')
|
||||
row = cur.fetchone()
|
||||
print("shared_buffers is {}, table size {}", row[0], row[1]);
|
||||
assert int(row[0]) < int(row[1])
|
||||
|
||||
# Stop and restart pageserver. This is a more or less graceful shutdown, although
|
||||
# the page server doesn't currently have a shutdown routine so there's no difference
|
||||
# between stopping and crashing.
|
||||
pageserver.stop();
|
||||
pageserver.start();
|
||||
|
||||
# Stopping the pageserver breaks the connection from the postgres backend to
|
||||
# the page server, and causes the next query on the connection to fail. Start a new
|
||||
# postgres connection too, to avoid that error. (Ideally, the compute node would
|
||||
# handle that and retry internally, without propagating the error to the user, but
|
||||
# currently it doesn't...)
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000, )
|
||||
|
||||
# Stop the page server by force, and restart it
|
||||
pageserver.stop();
|
||||
pageserver.start();
|
||||
|
||||
@@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
#
|
||||
# Test restarting and recreating a postgres instance
|
||||
#
|
||||
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
|
||||
@pytest.mark.parametrize('with_wal_acceptors', [True, False])
|
||||
def test_restart_compute(
|
||||
zenith_cli,
|
||||
pageserver: ZenithPageserver,
|
||||
@@ -31,31 +31,56 @@ def test_restart_compute(
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Create table, and insert a row
|
||||
cur.execute('CREATE TABLE foo (t text)')
|
||||
cur.execute("INSERT INTO foo VALUES ('bar')")
|
||||
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
print("res = ", r)
|
||||
|
||||
# Stop and restart the Postgres instance
|
||||
# Remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||
wal_acceptors=wal_acceptor_connstrs)
|
||||
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the row
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (1, )
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
print("res = ", r)
|
||||
|
||||
# Insert another row
|
||||
cur.execute("INSERT INTO foo VALUES ('bar2')")
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
print("res = ", r)
|
||||
|
||||
# Again remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||
wal_acceptors=wal_acceptor_connstrs)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
print("res = ", r)
|
||||
|
||||
# And again remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||
wal_acceptors=wal_acceptor_connstrs)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
print("res = ", r)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import os
|
||||
|
||||
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
|
||||
|
||||
|
||||
@@ -28,24 +30,59 @@ def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFa
|
||||
cur.execute("INSERT INTO foo VALUES ('two')")
|
||||
cur.execute("PREPARE TRANSACTION 'insert_two'")
|
||||
|
||||
# Prepare a transaction that will insert a row
|
||||
cur.execute('BEGIN')
|
||||
cur.execute("INSERT INTO foo VALUES ('three')")
|
||||
cur.execute("PREPARE TRANSACTION 'insert_three'")
|
||||
|
||||
# Prepare another transaction that will insert a row
|
||||
cur.execute('BEGIN')
|
||||
cur.execute("INSERT INTO foo VALUES ('four')")
|
||||
cur.execute("PREPARE TRANSACTION 'insert_four'")
|
||||
|
||||
# On checkpoint state data copied to files in
|
||||
# pg_twophase directory and fsynced
|
||||
cur.execute('CHECKPOINT')
|
||||
|
||||
twophase_files = os.listdir(pg.pg_twophase_dir_path())
|
||||
print(twophase_files)
|
||||
assert len(twophase_files) == 4
|
||||
|
||||
cur.execute("COMMIT PREPARED 'insert_three'")
|
||||
cur.execute("ROLLBACK PREPARED 'insert_four'")
|
||||
cur.execute('CHECKPOINT')
|
||||
|
||||
twophase_files = os.listdir(pg.pg_twophase_dir_path())
|
||||
print(twophase_files)
|
||||
assert len(twophase_files) == 2
|
||||
|
||||
# Create a branch with the transaction in prepared state
|
||||
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"])
|
||||
|
||||
pg2 = postgres.create_start(
|
||||
# Create compute node, but don't start.
|
||||
# We want to observe pgdata before postgres starts
|
||||
pg2 = postgres.create(
|
||||
'test_twophase_prepared',
|
||||
config_lines=['max_prepared_transactions=5'],
|
||||
)
|
||||
|
||||
# Check that we restored only needed twophase files
|
||||
twophase_files2 = os.listdir(pg2.pg_twophase_dir_path())
|
||||
print(twophase_files2)
|
||||
assert twophase_files2.sort() == twophase_files.sort()
|
||||
|
||||
pg2 = pg2.start()
|
||||
conn2 = pg2.connect()
|
||||
cur2 = conn2.cursor()
|
||||
|
||||
# On the new branch, commit one of the prepared transactions, abort the other one.
|
||||
# On the new branch, commit one of the prepared transactions,
|
||||
# abort the other one.
|
||||
cur2.execute("COMMIT PREPARED 'insert_one'")
|
||||
cur2.execute("ROLLBACK PREPARED 'insert_two'")
|
||||
|
||||
cur2.execute('SELECT * FROM foo')
|
||||
assert cur2.fetchall() == [('one', )]
|
||||
assert cur2.fetchall() == [('one',), ('three',)]
|
||||
|
||||
# Neither insert is visible on the original branch, the transactions are still
|
||||
# in prepared state there.
|
||||
# Only one committed insert is visible on the original branch
|
||||
cur.execute('SELECT * FROM foo')
|
||||
assert cur.fetchall() == []
|
||||
assert cur.fetchall() == [('three',)]
|
||||
|
||||
@@ -249,10 +249,68 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
|
||||
print('Starting pageserver cleanup')
|
||||
ps.stop()
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
def __init__(self, log_dir: str, pg_distrib_dir: str):
|
||||
self.log_dir = log_dir
|
||||
self.pg_install_path = pg_distrib_dir
|
||||
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
|
||||
self.env = os.environ.copy()
|
||||
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
|
||||
|
||||
def _fixpath(self, command: List[str]) -> None:
|
||||
if '/' not in command[0]:
|
||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||
|
||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||
if env_add is None:
|
||||
return self.env
|
||||
env = self.env.copy()
|
||||
env.update(env_add)
|
||||
return env
|
||||
|
||||
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries.
|
||||
|
||||
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||
|
||||
All the necessary environment variables will be set.
|
||||
|
||||
If the first argument (the command name) doesn't include a path (no '/'
|
||||
characters present), then it will be edited to include the correct path.
|
||||
|
||||
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
def run_capture(self,
|
||||
command: List[str],
|
||||
env: Optional[Env] = None,
|
||||
cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||
|
||||
This is just like `run`, but for chatty programs.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
|
||||
|
||||
|
||||
@zenfixture
|
||||
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_distrib_dir)
|
||||
|
||||
class Postgres(PgProtocol):
|
||||
""" An object representing a running postgres daemon. """
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int):
|
||||
super().__init__(host='localhost', port=port)
|
||||
|
||||
self.zenith_cli = zenith_cli
|
||||
@@ -260,6 +318,7 @@ class Postgres(PgProtocol):
|
||||
self.repo_dir = repo_dir
|
||||
self.branch: Optional[str] = None # dubious, see asserts below
|
||||
self.tenant_id = tenant_id
|
||||
self.pg_bin = pg_bin
|
||||
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
|
||||
|
||||
def create(
|
||||
@@ -299,20 +358,32 @@ class Postgres(PgProtocol):
|
||||
"""
|
||||
|
||||
assert self.branch is not None
|
||||
|
||||
print(f"Starting postgres on brach {self.branch}")
|
||||
|
||||
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
|
||||
self.running = True
|
||||
|
||||
self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()])
|
||||
|
||||
return self
|
||||
|
||||
def pg_data_dir_path(self) -> str:
|
||||
""" Path to data directory """
|
||||
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
|
||||
return os.path.join(self.repo_dir, path)
|
||||
|
||||
def pg_xact_dir_path(self) -> str:
|
||||
""" Path to pg_xact dir """
|
||||
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact'
|
||||
return os.path.join(self.repo_dir, path)
|
||||
return os.path.join(self.pg_data_dir_path(), 'pg_xact')
|
||||
|
||||
def pg_twophase_dir_path(self) -> str:
|
||||
""" Path to pg_twophase dir """
|
||||
return os.path.join(self.pg_data_dir_path(), 'pg_twophase')
|
||||
|
||||
def config_file_path(self) -> str:
|
||||
""" Path to postgresql.conf """
|
||||
filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf'
|
||||
return os.path.join(self.repo_dir, filename)
|
||||
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
|
||||
|
||||
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
|
||||
"""
|
||||
@@ -404,13 +475,14 @@ class Postgres(PgProtocol):
|
||||
|
||||
class PostgresFactory:
|
||||
""" An object representing multiple running postgres daemons. """
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431):
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
|
||||
self.zenith_cli = zenith_cli
|
||||
self.repo_dir = repo_dir
|
||||
self.num_instances = 0
|
||||
self.instances: List[Postgres] = []
|
||||
self.initial_tenant: str = initial_tenant
|
||||
self.base_port = base_port
|
||||
self.pg_bin = pg_bin
|
||||
|
||||
def create_start(
|
||||
self,
|
||||
@@ -423,6 +495,7 @@ class PostgresFactory:
|
||||
pg = Postgres(
|
||||
zenith_cli=self.zenith_cli,
|
||||
repo_dir=self.repo_dir,
|
||||
pg_bin=self.pg_bin,
|
||||
tenant_id=tenant_id or self.initial_tenant,
|
||||
port=self.base_port + self.num_instances + 1,
|
||||
)
|
||||
@@ -496,8 +569,8 @@ def initial_tenant(pageserver: ZenithPageserver):
|
||||
|
||||
|
||||
@zenfixture
|
||||
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]:
|
||||
pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant)
|
||||
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
|
||||
pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
|
||||
|
||||
yield pgfactory
|
||||
|
||||
@@ -505,67 +578,6 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Itera
|
||||
print('Starting postgres cleanup')
|
||||
pgfactory.stop_all()
|
||||
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
def __init__(self, log_dir: str, pg_distrib_dir: str):
|
||||
self.log_dir = log_dir
|
||||
self.pg_install_path = pg_distrib_dir
|
||||
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
|
||||
self.env = os.environ.copy()
|
||||
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
|
||||
|
||||
def _fixpath(self, command: List[str]) -> None:
|
||||
if '/' not in command[0]:
|
||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||
|
||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||
if env_add is None:
|
||||
return self.env
|
||||
env = self.env.copy()
|
||||
env.update(env_add)
|
||||
return env
|
||||
|
||||
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries.
|
||||
|
||||
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||
|
||||
All the necessary environment variables will be set.
|
||||
|
||||
If the first argument (the command name) doesn't include a path (no '/'
|
||||
characters present), then it will be edited to include the correct path.
|
||||
|
||||
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
def run_capture(self,
|
||||
command: List[str],
|
||||
env: Optional[Env] = None,
|
||||
cwd: Optional[str] = None) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||
|
||||
This is just like `run`, but for chatty programs.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
|
||||
|
||||
|
||||
@zenfixture
|
||||
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_distrib_dir)
|
||||
|
||||
|
||||
def read_pid(path: Path):
|
||||
""" Read content of file into number """
|
||||
return int(path.read_text())
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: e3175fe60a...9932d259be
@@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! FIXME: better description needed here
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bincode::config::Options;
|
||||
use bytes::{Buf, Bytes};
|
||||
use log::*;
|
||||
@@ -27,6 +27,7 @@ use crate::replication::HotStandbyFeedback;
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::WalAcceptorConf;
|
||||
use pageserver::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
@@ -236,7 +237,9 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
// Receive information about server
|
||||
let server_info = self.read_msg::<ServerInfo>()?;
|
||||
let server_info = self
|
||||
.read_msg::<ServerInfo>()
|
||||
.context("Failed to receive server info")?;
|
||||
info!(
|
||||
"Start handshake with wal_proposer {} sysid {} timeline {} tenant {}",
|
||||
self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id,
|
||||
@@ -284,7 +287,9 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
self.write_msg(&my_info)?;
|
||||
|
||||
/* Wait for vote request */
|
||||
let prop = self.read_msg::<RequestVote>()?;
|
||||
let prop = self
|
||||
.read_msg::<RequestVote>()
|
||||
.context("Failed to read vote request")?;
|
||||
/* This is Paxos check which should ensure that only one master can perform commits */
|
||||
if prop.node_id < my_info.server.node_id {
|
||||
/* Send my node-id to inform proposer that it's candidate was rejected */
|
||||
@@ -296,8 +301,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
);
|
||||
}
|
||||
my_info.server.node_id = prop.node_id;
|
||||
this_timeline.get().set_info(&my_info);
|
||||
/* Need to persist our vote first */
|
||||
this_timeline.get().set_info(&my_info);
|
||||
this_timeline.get().save_control_file(true)?;
|
||||
|
||||
let mut flushed_restart_lsn = Lsn(0);
|
||||
@@ -318,9 +323,11 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
info!(
|
||||
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}",
|
||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
|
||||
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
|
||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn,
|
||||
);
|
||||
let mut last_rec_lsn = Lsn(0);
|
||||
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
|
||||
|
||||
// Main loop
|
||||
loop {
|
||||
@@ -330,7 +337,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
let msg_bytes = self.read_msg_bytes()?;
|
||||
let mut msg_reader = msg_bytes.reader();
|
||||
|
||||
let req = SafeKeeperRequest::des_from(&mut msg_reader)?;
|
||||
let req = SafeKeeperRequest::des_from(&mut msg_reader)
|
||||
.context("Failed to get WAL message header")?;
|
||||
if req.sender_id != my_info.server.node_id {
|
||||
bail!("Sender NodeId is changed");
|
||||
}
|
||||
@@ -342,27 +350,52 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
let end_pos = req.end_lsn;
|
||||
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
assert!(rec_size <= MAX_SEND_SIZE);
|
||||
if rec_size != 0 {
|
||||
debug!(
|
||||
"received for {} bytes between {} and {}",
|
||||
rec_size, start_pos, end_pos,
|
||||
);
|
||||
|
||||
debug!(
|
||||
"received for {} bytes between {} and {}",
|
||||
rec_size, start_pos, end_pos,
|
||||
);
|
||||
/* Receive message body (from the rest of the message) */
|
||||
let mut buf = Vec::with_capacity(rec_size);
|
||||
msg_reader.read_to_end(&mut buf)?;
|
||||
assert_eq!(buf.len(), rec_size);
|
||||
|
||||
/* Receive message body (from the rest of the message) */
|
||||
let mut buf = Vec::with_capacity(rec_size);
|
||||
msg_reader.read_to_end(&mut buf)?;
|
||||
assert_eq!(buf.len(), rec_size);
|
||||
|
||||
/* Save message in file */
|
||||
Self::write_wal_file(
|
||||
swh,
|
||||
start_pos,
|
||||
timeline_id,
|
||||
this_timeline.get(),
|
||||
wal_seg_size,
|
||||
&buf,
|
||||
)?;
|
||||
if decoder.available() != start_pos {
|
||||
info!(
|
||||
"Restart decoder from {} to {}",
|
||||
decoder.available(),
|
||||
start_pos
|
||||
);
|
||||
decoder = WalStreamDecoder::new(start_pos, false);
|
||||
}
|
||||
decoder.feed_bytes(&buf);
|
||||
loop {
|
||||
match decoder.poll_decode() {
|
||||
Err(e) => info!("Decode error {}", e),
|
||||
Ok(None) => {},
|
||||
Ok(Some((lsn, _rec))) => {
|
||||
last_rec_lsn = lsn;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
info!(
|
||||
"Receive WAL {}..{} last_rec_lsn={}",
|
||||
start_pos, end_pos, last_rec_lsn
|
||||
);
|
||||
|
||||
/* Save message in file */
|
||||
Self::write_wal_file(
|
||||
swh,
|
||||
start_pos,
|
||||
timeline_id,
|
||||
this_timeline.get(),
|
||||
wal_seg_size,
|
||||
&buf,
|
||||
)?;
|
||||
}
|
||||
my_info.restart_lsn = req.restart_lsn;
|
||||
my_info.commit_lsn = req.commit_lsn;
|
||||
|
||||
@@ -372,13 +405,13 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
* maximum (vcl) determined by WAL proposer during handshake.
|
||||
* Switching epoch means that node completes recovery and start writing in the WAL new data.
|
||||
*/
|
||||
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {
|
||||
if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) {
|
||||
info!("Switch to new epoch {}", prop.epoch);
|
||||
my_info.epoch = prop.epoch; /* bump epoch */
|
||||
sync_control_file = true;
|
||||
}
|
||||
if end_pos > my_info.flush_lsn {
|
||||
my_info.flush_lsn = end_pos;
|
||||
if last_rec_lsn > my_info.flush_lsn {
|
||||
my_info.flush_lsn = last_rec_lsn;
|
||||
}
|
||||
/*
|
||||
* Update restart LSN in control file.
|
||||
@@ -386,6 +419,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
* when restart_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
||||
this_timeline.get().set_info(&my_info);
|
||||
this_timeline.get().save_control_file(sync_control_file)?;
|
||||
|
||||
if sync_control_file {
|
||||
@@ -396,7 +430,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
|
||||
let resp = SafeKeeperResponse {
|
||||
epoch: my_info.epoch,
|
||||
flush_lsn: end_pos,
|
||||
flush_lsn: my_info.flush_lsn,
|
||||
hs_feedback: this_timeline.get().get_hs_feedback(),
|
||||
};
|
||||
self.write_msg(&resp)?;
|
||||
@@ -405,9 +439,15 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
* Ping wal sender that new data is available.
|
||||
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
||||
*/
|
||||
trace!(
|
||||
"Notify WAL senders min({}, {})={}",
|
||||
req.commit_lsn,
|
||||
my_info.flush_lsn,
|
||||
min(req.commit_lsn, my_info.flush_lsn)
|
||||
);
|
||||
this_timeline
|
||||
.get()
|
||||
.notify_wal_senders(min(req.commit_lsn, end_pos));
|
||||
.notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -76,8 +76,10 @@ impl ReplicationConn {
|
||||
let feedback = HotStandbyFeedback::des(&m)?;
|
||||
subscriber.add_hs_feedback(feedback);
|
||||
}
|
||||
FeMessage::Sync => {}
|
||||
FeMessage::CopyFailed => return Err(anyhow!("Copy failed")),
|
||||
_ => {
|
||||
// We only handle `CopyData` messages. Anything else is ignored.
|
||||
// We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
|
||||
info!("unexpected message {:?}", msg);
|
||||
}
|
||||
}
|
||||
@@ -215,9 +217,14 @@ impl ReplicationConn {
|
||||
data: &file_buf,
|
||||
}))?;
|
||||
|
||||
start_pos += send_size as u64;
|
||||
debug!(
|
||||
"Sent WAL to page server {}..{}, end_pos={}",
|
||||
start_pos,
|
||||
start_pos + send_size as u64,
|
||||
end_pos
|
||||
);
|
||||
|
||||
debug!("Sent WAL to page server up to {}", end_pos);
|
||||
start_pos += send_size as u64;
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
|
||||
@@ -175,7 +175,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn _stop_wal_senders(&self) {
|
||||
pub fn stop_wal_senders(&self) {
|
||||
self.notify_wal_senders(END_REPLICATION_MARKER);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,11 @@ impl Lsn {
|
||||
/// Maximum possible value for an LSN
|
||||
pub const MAX: Lsn = Lsn(u64::MAX);
|
||||
|
||||
/// Align LSN on 8-byte boundary (alignment of WAL records).
|
||||
pub fn align(&self) -> Lsn {
|
||||
Lsn((self.0 + 7) & !7)
|
||||
}
|
||||
|
||||
/// Subtract a number, returning None on overflow.
|
||||
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
|
||||
let other: u64 = other.into();
|
||||
|
||||
@@ -301,8 +301,9 @@ impl PostgresBackend {
|
||||
FeMessage::Query(m) => {
|
||||
trace!("got query {:?}", m.body);
|
||||
// xxx distinguish fatal and recoverable errors?
|
||||
if let Err(e) = handler.process_query(self, m.body) {
|
||||
if let Err(e) = handler.process_query(self, m.body.clone()) {
|
||||
let errmsg = format!("{}", e);
|
||||
warn!("query handler for {:?} failed: {}", m.body, errmsg);
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?;
|
||||
}
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
@@ -340,7 +341,7 @@ impl PostgresBackend {
|
||||
|
||||
// We prefer explicit pattern matching to wildcards, because
|
||||
// this helps us spot the places where new variants are missing
|
||||
FeMessage::CopyData(_) | FeMessage::CopyDone => {
|
||||
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
|
||||
bail!("unexpected message type: {:?}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ pub enum FeMessage {
|
||||
Terminate,
|
||||
CopyData(Bytes),
|
||||
CopyDone,
|
||||
CopyFailed,
|
||||
PasswordMessage(Bytes),
|
||||
}
|
||||
|
||||
@@ -138,6 +139,7 @@ impl FeMessage {
|
||||
b'X' => Ok(Some(FeMessage::Terminate)),
|
||||
b'd' => Ok(Some(FeMessage::CopyData(body))),
|
||||
b'c' => Ok(Some(FeMessage::CopyDone)),
|
||||
b'f' => Ok(Some(FeMessage::CopyFailed)),
|
||||
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
|
||||
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
|
||||
}
|
||||
@@ -338,6 +340,7 @@ pub enum BeMessage<'a> {
|
||||
ControlFile,
|
||||
CopyData(&'a [u8]),
|
||||
CopyDone,
|
||||
CopyFailed,
|
||||
CopyInResponse,
|
||||
CopyOutResponse,
|
||||
CopyBothResponse,
|
||||
@@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> {
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
BeMessage::CopyFailed => {
|
||||
buf.put_u8(b'f');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
BeMessage::CopyInResponse => {
|
||||
buf.put_u8(b'G');
|
||||
write_body(buf, |buf| {
|
||||
|
||||
Reference in New Issue
Block a user