Compare commits

..

7 Commits

Author SHA1 Message Date
Eric Seppanen
5b79e033bd wip: adapt layered_repository to snapfile 2021-08-19 20:23:11 -07:00
Eric Seppanen
93b2e49939 remove snapshot id; replace with timeline id
The snapshot id doesn't make sense when two snapshots are squashed.
Better to use the timeline id anyway.

It's still a little strange to squash two timelines together, but it
must be possible to flatten history even if there have been many branch
events, so this is should be a control plane / snapshot management
problem.
2021-08-18 12:11:22 -07:00
Eric Seppanen
c3833ef0f4 add snapshot squashing
Add logic to squash snapshot files.
Add snaptool (a binary for inspecting and manipulating snapshot files).
Use bookfile 0.3, which allows concurrent reads.
2021-08-18 12:11:22 -07:00
Eric Seppanen
acfc5c5d21 add another Page constructor, which copies bytes 2021-08-18 12:11:22 -07:00
Eric Seppanen
0a0d12368e snapfile: add snapshot metadata
Add some basic metadata to the snapshot file, including an id number,
predecessor snapshot, and lsn.
2021-08-18 12:11:22 -07:00
Eric Seppanen
8d2b517359 snapfile: split apart code into multiple files
versioned.rs: for things that get serialized and must be versioned to
avoid breaking backwards compatibility.

page.rs: for the Page struct.
2021-08-18 12:11:22 -07:00
Eric Seppanen
26bcd72619 add snapfile
The snapfile crate implements a snapshot file format. The format relies
heavily on the bookfile crate for the structured file format, and the
aversion crate for versioned data structures.

The overall structure of the file looks like this:
- first 4KB: bookfile header
- next 8KB * N: raw page data for N pages
- page index map (from page identifier to data offset)
- bookfile chapter index

When a SnapFile is opened for reading, the page index map is read into
memory; any page can be read directly from the file from that point.
2021-08-18 12:11:22 -07:00
40 changed files with 1349 additions and 963 deletions

View File

@@ -237,23 +237,6 @@ jobs:
- store_test_results:
path: /tmp/test_output
# Build zenithdb/zenith:latest image and push it to Docker hub
docker-image:
docker:
- image: cimg/base:2021.04
steps:
- checkout
- setup_remote_docker:
docker_layer_caching: true
- run:
name: Init postgres submodule
command: git submodule update --init --depth 1
- run:
name: Build and push Docker image
command: |
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
docker build -t zenithdb/zenith:latest . && docker push zenithdb/zenith:latest
workflows:
build_and_test:
jobs:
@@ -282,14 +265,3 @@ workflows:
test_selection: batch_others
requires:
- build-zenith-<< matrix.build_type >>
- docker-image:
# Context gives an ability to login
context: Docker Hub
# Build image only for commits to main
filters:
branches:
only:
- main
requires:
- pg_regress tests release
- other tests release

78
Cargo.lock generated
View File

@@ -703,6 +703,15 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1403,6 +1412,30 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -1932,6 +1965,21 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "snapfile"
version = "0.1.0"
dependencies = [
"anyhow",
"aversion",
"bookfile",
"hex",
"rand",
"serde",
"structopt",
"tempfile",
"zenith_utils",
]
[[package]]
name = "socket2"
version = "0.4.0"
@@ -1964,6 +2012,30 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "structopt"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69b041cdcb67226aca307e6e7be44c8806423d83e018bd662360a93dabce4d71"
dependencies = [
"clap",
"lazy_static",
"structopt-derive",
]
[[package]]
name = "structopt-derive"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7813934aecf5f51a54775e00068c237de98489463968231a51746bbbc03f9c10"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
@@ -2272,6 +2344,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"
version = "0.1.8"

View File

@@ -9,6 +9,7 @@ members = [
"zenith",
"zenith_metrics",
"zenith_utils",
"snapfile",
]
[profile.release]

View File

@@ -1,77 +1,94 @@
#
# Docker image for console integration testing.
#
# We may also reuse it in CI to unify installation process and as a general binaries building
# tool for production servers.
#
# Dynamic linking is used for librocksdb and libstdc++ bacause librocksdb-sys calls
# bindgen with "dynamic" feature flag. This also prevents usage of dockerhub alpine-rust
# images which are statically linked and have guards against any dlopen. I would rather
# prefer all static binaries so we may change the way librocksdb-sys builds or wait until
# we will have our own storage and drop rockdb dependency.
#
# Cargo-chef is used to separate dependencies building from main binaries building. This
# way `docker build` will download and install dependencies only of there are changes to
# out Cargo.toml files.
#
#
# Build Postgres separately --- this layer will be rebuilt only if one of
# mentioned paths will get any changes.
# build postgres separately -- this layer will be rebuilt only if one of
# mentioned paths will get any changes
#
FROM zenithdb/build:buster AS pg-build
WORKDIR /zenith
FROM alpine:3.13 as pg-build
RUN apk add --update clang llvm compiler-rt compiler-rt-static lld musl-dev binutils \
make bison flex readline-dev zlib-dev perl linux-headers libseccomp-dev
WORKDIR zenith
COPY ./vendor/postgres vendor/postgres
COPY ./Makefile Makefile
RUN make -j $(getconf _NPROCESSORS_ONLN) -s postgres
# Build using clang and lld
RUN CC='clang' LD='lld' CFLAGS='-fuse-ld=lld --rtlib=compiler-rt' make postgres -j4
#
# Calculate cargo dependencies.
# This will always run, but only generate recipe.json with list of dependencies without
# installing them.
#
FROM zenithdb/build:buster AS cargo-deps-inspect
WORKDIR /zenith
FROM alpine:20210212 as cargo-deps-inspect
RUN apk add --update rust cargo
RUN cargo install cargo-chef
WORKDIR zenith
COPY . .
RUN cargo chef prepare --recipe-path /zenith/recipe.json
RUN cargo chef prepare --recipe-path recipe.json
#
# Build cargo dependencies.
# This temp cantainner should be rebuilt only if recipe.json was changed.
# This temp cantainner would be build only if recipe.json was changed.
#
FROM zenithdb/build:buster AS deps-build
WORKDIR /zenith
FROM alpine:20210212 as deps-build
RUN apk add --update rust cargo openssl-dev clang build-base
# rust-rocksdb can be built against system-wide rocksdb -- that saves about
# 10 minutes during build. Rocksdb apk package is in testing now, but use it
# anyway. In case of any troubles we can download and build rocksdb here manually
# (to cache it as a docker layer).
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
WORKDIR zenith
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY --from=cargo-deps-inspect /usr/local/cargo/bin/cargo-chef /usr/local/cargo/bin/
COPY --from=cargo-deps-inspect /root/.cargo/bin/cargo-chef /root/.cargo/bin/
COPY --from=cargo-deps-inspect /zenith/recipe.json recipe.json
RUN ROCKSDB_LIB_DIR=/usr/lib/ cargo chef cook --release --recipe-path recipe.json
#
# Build zenith binaries
#
FROM zenithdb/build:buster AS build
WORKDIR /zenith
FROM alpine:20210212 as build
RUN apk add --update rust cargo openssl-dev clang build-base
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
WORKDIR zenith
COPY . .
# Copy cached dependencies
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY --from=deps-build /zenith/target target
COPY --from=deps-build /usr/local/cargo/ /usr/local/cargo/
COPY --from=deps-build /root/.cargo /root/.cargo
RUN cargo build --release
#
# Copy binaries to resulting image.
# build-base hare to provide libstdc++ (it will also bring gcc, but leave it this way until we figure
# out how to statically link rocksdb or avoid it at all).
#
FROM debian:buster-slim
WORKDIR /data
RUN apt-get update && apt-get -yq install librocksdb-dev libseccomp-dev openssl && \
mkdir zenith_install
FROM alpine:3.13
RUN apk add --update openssl build-base libseccomp-dev
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb
COPY --from=build /zenith/target/release/pageserver /usr/local/bin
COPY --from=build /zenith/target/release/wal_acceptor /usr/local/bin
COPY --from=build /zenith/target/release/proxy /usr/local/bin
COPY --from=pg-build /zenith/tmp_install postgres_install
COPY --from=pg-build /zenith/tmp_install /usr/local
COPY docker-entrypoint.sh /docker-entrypoint.sh
# Remove build artifacts (~ 500 MB)
RUN rm -rf postgres_install/build && \
# 'Install' Postgres binaries locally
cp -r postgres_install/* /usr/local/ && \
# Prepare an archive of Postgres binaries (should be around 11 MB)
# and keep it inside container for an ease of deploy pipeline.
cd postgres_install && tar -czf /data/postgres_install.tar.gz . && cd .. && \
rm -rf postgres_install
RUN useradd -m -d /data zenith
RUN addgroup zenith && adduser -h /data -D -G zenith zenith
VOLUME ["/data"]
WORKDIR /data
USER zenith
EXPOSE 6400
ENTRYPOINT ["/docker-entrypoint.sh"]

View File

@@ -1,95 +0,0 @@
#
# Docker image for console integration testing.
#
# We may also reuse it in CI to unify installation process and as a general binaries building
# tool for production servers.
#
# Dynamic linking is used for librocksdb and libstdc++ bacause librocksdb-sys calls
# bindgen with "dynamic" feature flag. This also prevents usage of dockerhub alpine-rust
# images which are statically linked and have guards against any dlopen. I would rather
# prefer all static binaries so we may change the way librocksdb-sys builds or wait until
# we will have our own storage and drop rockdb dependency.
#
# Cargo-chef is used to separate dependencies building from main binaries building. This
# way `docker build` will download and install dependencies only of there are changes to
# out Cargo.toml files.
#
#
# build postgres separately -- this layer will be rebuilt only if one of
# mentioned paths will get any changes
#
FROM alpine:3.13 as pg-build
RUN apk add --update clang llvm compiler-rt compiler-rt-static lld musl-dev binutils \
make bison flex readline-dev zlib-dev perl linux-headers libseccomp-dev
WORKDIR zenith
COPY ./vendor/postgres vendor/postgres
COPY ./Makefile Makefile
# Build using clang and lld
RUN CC='clang' LD='lld' CFLAGS='-fuse-ld=lld --rtlib=compiler-rt' make postgres -j4
#
# Calculate cargo dependencies.
# This will always run, but only generate recipe.json with list of dependencies without
# installing them.
#
FROM alpine:20210212 as cargo-deps-inspect
RUN apk add --update rust cargo
RUN cargo install cargo-chef
WORKDIR zenith
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
#
# Build cargo dependencies.
# This temp cantainner would be build only if recipe.json was changed.
#
FROM alpine:20210212 as deps-build
RUN apk add --update rust cargo openssl-dev clang build-base
# rust-rocksdb can be built against system-wide rocksdb -- that saves about
# 10 minutes during build. Rocksdb apk package is in testing now, but use it
# anyway. In case of any troubles we can download and build rocksdb here manually
# (to cache it as a docker layer).
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
WORKDIR zenith
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY --from=cargo-deps-inspect /root/.cargo/bin/cargo-chef /root/.cargo/bin/
COPY --from=cargo-deps-inspect /zenith/recipe.json recipe.json
RUN ROCKSDB_LIB_DIR=/usr/lib/ cargo chef cook --release --recipe-path recipe.json
#
# Build zenith binaries
#
FROM alpine:20210212 as build
RUN apk add --update rust cargo openssl-dev clang build-base
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb-dev
WORKDIR zenith
COPY . .
# Copy cached dependencies
COPY --from=pg-build /zenith/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY --from=deps-build /zenith/target target
COPY --from=deps-build /root/.cargo /root/.cargo
RUN cargo build --release
#
# Copy binaries to resulting image.
# build-base hare to provide libstdc++ (it will also bring gcc, but leave it this way until we figure
# out how to statically link rocksdb or avoid it at all).
#
FROM alpine:3.13
RUN apk add --update openssl build-base libseccomp-dev
RUN apk --no-cache --update --repository https://dl-cdn.alpinelinux.org/alpine/edge/testing add rocksdb
COPY --from=build /zenith/target/release/pageserver /usr/local/bin
COPY --from=build /zenith/target/release/wal_acceptor /usr/local/bin
COPY --from=build /zenith/target/release/proxy /usr/local/bin
COPY --from=pg-build /zenith/tmp_install /usr/local
COPY docker-entrypoint.sh /docker-entrypoint.sh
RUN addgroup zenith && adduser -h /data -D -G zenith zenith
VOLUME ["/data"]
WORKDIR /data
USER zenith
EXPOSE 6400
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["pageserver"]

View File

@@ -1,15 +0,0 @@
#
# Image with all the required dependencies to build https://github.com/zenithdb/zenith
# and Postgres from https://github.com/zenithdb/postgres
# Also includes some rust development and build tools.
#
FROM rust:slim-buster
WORKDIR /zenith
# Install postgres and zenith build dependencies
# clang is for rocksdb
RUN apt-get update && apt-get -yq install automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libseccomp-dev pkg-config libssl-dev librocksdb-dev clang
# Install rust tools
RUN rustup component add clippy && cargo install cargo-chef cargo-audit

View File

@@ -12,12 +12,15 @@ apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libsec
libssl-dev clang
```
[Rust] 1.52 or later is also required.
[Rust] 1.48 or later is also required.
To run the `psql` client, install the `postgresql-client` package or modify `PATH` and `LD_LIBRARY_PATH` to include `tmp_install/bin` and `tmp_install/lib`, respectively.
To run the integration tests (not required to use the code), install
Python (3.6 or higher), and install python3 packages with `pipenv` using `pipenv install` in the project directory.
Python (3.6 or higher), and install python3 packages with `pip` (called `pip3` on some systems):
```
pip install pytest psycopg2
```
2. Build zenith and patched postgres
```sh
@@ -103,9 +106,10 @@ pytest
## Documentation
Now we use README files to cover design ideas and overall architecture for each module and `rustdoc` style documentation comments. See also [/docs/](/docs/) a top-level overview of all available markdown documentation.
Now we use README files to cover design ideas and overall architecture for each module.
And rustdoc style documentation comments.
To view your `rustdoc` documentation in a browser, try running `cargo doc --no-deps --open`
To view your documentation in a browser, try running `cargo doc --no-deps --open`
## Source tree layout

View File

@@ -320,7 +320,7 @@ impl PostgresNode {
// Never clean up old WAL. TODO: We should use a replication
// slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeeper or
// from removing WAL that hasn't been streamed to the safekeepr or
// page server yet. (gh issue #349)
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;

View File

@@ -1,6 +1,6 @@
#!/bin/sh
if [ "$1" = 'pageserver' ]; then
if [ ! -d "/data/tenants" ]; then
if [ ! -d "/data/timelines" ]; then
echo "Initializing pageserver data directory"
pageserver --init -D /data --postgres-distrib /usr/local
fi

View File

@@ -1,11 +0,0 @@
# Zenith documentation
## Table of contents
- [authentication.md](authentication.md) — pageserver JWT authentication.
- [docker.md](docker.md) — Docker images and building pipeline.
- [multitenancy.md](multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
- [pageserver/README](/pageserver/README) — pageserver overview.
- [postgres_ffi/README](/postgres_ffi/README) — Postgres FFI overview.
- [test_runner/README.md](/test_runner/README.md) — tests infrastructure overview.
- [walkeeper/README](/walkeeper/README.md) — WAL service overview.

View File

@@ -1,38 +0,0 @@
# Docker images of Zenith
## Images
Currently we build two main images:
- [zenithdb/zenith](https://hub.docker.com/repository/docker/zenithdb/zenith) — image with pre-built `pageserver`, `wal_acceptor` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
- [zenithdb/compute-node](https://hub.docker.com/repository/docker/zenithdb/compute-node) — compute node image with pre-built Postgres binaries from [zenithdb/postgres](https://github.com/zenithdb/postgres).
And two intermediate images used either to reduce build time or to deliver some additional binary tools from other repos:
- [zenithdb/build](https://hub.docker.com/repository/docker/zenithdb/build) — image with all the dependencies required to build Zenith and compute node images. This image is based on `rust:slim-buster`, so it also has a proper `rust` environment. Built from [/Dockerfile.build](/Dockerfile.build).
- [zenithdb/compute-tools](https://hub.docker.com/repository/docker/zenithdb/compute-tools) — compute node configuration management tools.
## Building pipeline
1. Image `zenithdb/compute-tools` is re-built automatically.
2. Image `zenithdb/build` is built manually. If you want to introduce any new compile time dependencies to Zenith or compute node you have to update this image as well, build it and push to Docker Hub.
Build:
```sh
docker build -t zenithdb/build:buster -f Dockerfile.build .
```
Login:
```sh
docker login
```
Push to Docker Hub:
```sh
docker push zenithdb/build:buster
```
3. Image `zenithdb/compute-node` is built independently in the [zenithdb/postgres](https://github.com/zenithdb/postgres) repo.
4. Image `zenithdb/zenith` is built in this repo after a successful `release` tests run and pushed to Docker Hub automatically.

0
f
View File

View File

@@ -41,6 +41,7 @@ use zenith_utils::seqwait::SeqWait;
mod inmemory_layer;
mod layer_map;
mod page_history;
mod snapshot_layer;
mod storage_layer;
@@ -842,34 +843,6 @@ impl Timeline for LayeredTimeline {
fn get_prev_record_lsn(&self) -> Lsn {
self.prev_record_lsn.load()
}
///
/// Wait until WAL has been received up to the given LSN.
///
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
Ok(lsn)
}
}
impl LayeredTimeline {
@@ -1083,6 +1056,34 @@ impl LayeredTimeline {
Ok(layer_rc)
}
///
/// Wait until WAL has been received up to the given LSN.
///
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
Ok(lsn)
}
///
/// Flush to disk all data that was written with the put_* functions
///

View File

@@ -2,15 +2,15 @@
//! An in-memory layer stores recently received page versions in memory. The page versions
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
//!
use crate::layered_repository::page_history::PageHistory;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::SnapshotLayer;
use crate::layered_repository::{LayeredTimeline, SnapshotLayer};
use crate::repository::WALRecord;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use bytes::Bytes;
use log::*;
use std::collections::BTreeMap;
@@ -43,9 +43,9 @@ pub struct InMemoryLayerInner {
///
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
/// Indexed by block number.
///
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
pages: BTreeMap<u32, PageHistory>,
///
/// `segsizes` tracks the size of the segment at different points in time.
@@ -90,29 +90,32 @@ impl Layer for InMemoryLayer {
) -> Result<Option<Lsn>> {
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
assert!(self.seg.blknum_in_seg(blknum));
{
let inner = self.inner.lock().unwrap();
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let mut iter = inner
.page_versions
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
let pages = &inner.pages;
// FIXME: this assumes the latest page version is always the right answer.
// How should this work if the requested lsn is in the past? in the future?
let latest_version = pages
.get(&blknum)
.and_then(PageHistory::latest)
.ok_or_else(|| anyhow!("page not found"))?;
let (entry_lsn, entry) = latest_version;
if true {
if let Some(img) = &entry.page_image {
reconstruct_data.page_img = Some(img.clone());
need_base_image_lsn = None;
break;
} else if let Some(rec) = &entry.record {
reconstruct_data.records.push(rec.clone());
if rec.will_init {
// This WAL record initializes the page, so no need to go further back
need_base_image_lsn = None;
break;
} else {
need_base_image_lsn = Some(*entry_lsn);
need_base_image_lsn = Some(entry_lsn);
}
} else {
// No base image, and no WAL record. Huh?
@@ -120,7 +123,7 @@ impl Layer for InMemoryLayer {
}
}
// release lock on 'page_versions'
// release lock on self.pages
}
Ok(need_base_image_lsn)
@@ -184,7 +187,7 @@ impl InMemoryLayer {
start_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
pages: BTreeMap::new(),
segsizes: BTreeMap::new(),
}),
})
@@ -230,15 +233,11 @@ impl InMemoryLayer {
);
let mut inner = self.inner.lock().unwrap();
let old = inner.page_versions.insert((blknum, lsn), pv);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
"Page version of rel {} blk {} at {} already exists",
self.seg.rel, blknum, lsn
);
}
let page_history = inner
.pages
.entry(blknum)
.or_insert_with(PageHistory::default);
page_history.push(lsn, pv);
// Also update the relation size, if this extended the relation.
if self.seg.rel.is_blocky() {
@@ -311,7 +310,7 @@ impl InMemoryLayer {
timelineid,
lsn
);
let mut page_versions = BTreeMap::new();
let mut pages = BTreeMap::new();
let mut segsizes = BTreeMap::new();
let seg = src.get_seg_tag();
@@ -333,7 +332,8 @@ impl InMemoryLayer {
page_image: Some(img),
record: None,
};
page_versions.insert((blknum, lsn), pv);
let page_history = PageHistory::from_image(lsn, pv);
pages.insert(blknum, page_history);
}
Ok(InMemoryLayer {
@@ -344,8 +344,8 @@ impl InMemoryLayer {
start_lsn: lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: page_versions,
segsizes: segsizes,
pages,
segsizes,
}),
})
}
@@ -388,10 +388,11 @@ impl InMemoryLayer {
};
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
let mut before_page_versions;
let mut before_pages = BTreeMap::new();
let mut before_segsizes;
let mut after_page_versions;
let mut after_segsizes;
let mut after_pages = BTreeMap::new();
if !dropped {
before_segsizes = BTreeMap::new();
after_segsizes = BTreeMap::new();
@@ -403,20 +404,16 @@ impl InMemoryLayer {
}
}
before_page_versions = BTreeMap::new();
after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > end_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
for (blknum, page_history) in inner.pages.iter() {
let (old, new) = page_history.clone().split_at(end_lsn);
before_pages.insert(*blknum, old);
after_pages.insert(*blknum, new);
}
} else {
before_page_versions = inner.page_versions.clone();
before_pages = inner.pages.clone();
before_segsizes = inner.segsizes.clone();
after_segsizes = BTreeMap::new();
after_page_versions = BTreeMap::new();
after_pages = BTreeMap::new();
}
// we can release the lock now.
@@ -431,13 +428,13 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
dropped,
before_page_versions,
before_pages,
before_segsizes,
)?;
// If there were any "new" page versions, initialize a new in-memory layer to hold
// them
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let new_open = if !after_segsizes.is_empty() || !after_pages.is_empty() {
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
let new_open = Self::copy_snapshot(
@@ -449,7 +446,7 @@ impl InMemoryLayer {
end_lsn,
)?;
let mut new_inner = new_open.inner.lock().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.pages.append(&mut after_pages);
new_inner.segsizes.append(&mut after_segsizes);
drop(new_inner);
@@ -476,14 +473,16 @@ impl InMemoryLayer {
for (k, v) in inner.segsizes.iter() {
result += &format!("{}: {}\n", k, v);
}
for (k, v) in inner.page_versions.iter() {
result += &format!(
"blk {} at {}: {}/{}\n",
k.0,
k.1,
v.page_image.is_some(),
v.record.is_some()
);
for (page_num, page_history) in inner.pages.iter() {
for (lsn, image) in page_history.iter() {
result += &format!(
"blk {} at {}: {}/{}\n",
page_num,
lsn,
image.page_image.is_some(),
image.record.is_some()
);
}
}
result

View File

@@ -0,0 +1,94 @@
use super::storage_layer::PageVersion;
use std::collections::VecDeque;
use zenith_utils::lsn::Lsn;
/// A data structure that holds one or more versions of a particular page number.
//
#[derive(Default, Clone)]
pub struct PageHistory {
/// Pages stored in order, from oldest to newest.
pages: VecDeque<(Lsn, PageVersion)>,
}
impl PageHistory {
/// Create a new PageHistory containing a single image.
pub fn from_image(lsn: Lsn, image: PageVersion) -> Self {
let mut pages = VecDeque::new();
pages.push_back((lsn, image));
PageHistory { pages }
}
/// Push a newer page image.
pub fn push(&mut self, lsn: Lsn, page: PageVersion) {
if let Some((back_lsn, _)) = self.pages.back() {
debug_assert_ne!(
back_lsn, &lsn,
"push page at lsn {:?} but one already exists",
lsn
);
debug_assert!(back_lsn < &lsn, "pushed page is older than latest lsn");
}
self.pages.push_back((lsn, page));
}
pub fn latest(&self) -> Option<(Lsn, &PageVersion)> {
self.pages.back().map(|(lsn, page)| (*lsn, page))
}
/// Split a page history at a particular LSN.
///
/// This consumes this PageHistory and returns two new ones.
/// Any changes exactly matching the split LSN will be in the
/// "old" history.
//
// FIXME: Is this necessary? There is some debate whether "splitting"
// layers is the best design.
//
pub fn split_at(self, split_lsn: Lsn) -> (PageHistory, PageHistory) {
let mut old = PageHistory::default();
let mut new = PageHistory::default();
for (lsn, page) in self.pages {
if lsn > split_lsn {
new.push(lsn, page)
} else {
old.push(lsn, page);
}
}
(old, new)
}
pub fn iter(&self) -> impl Iterator<Item = &(Lsn, PageVersion)> {
self.pages.iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn page_history() {
fn make_page(b: u8) -> PageVersion {
let image = vec![b; 8192].into();
PageVersion {
page_image: Some(image),
record: None,
}
}
let mut ph = PageHistory::default();
ph.push(10.into(), make_page(1));
ph.push(20.into(), make_page(2));
ph.push(30.into(), make_page(3));
let (latest_lsn, latest_image) = ph.latest().unwrap();
assert_eq!(latest_lsn, 30.into());
assert!(matches!(latest_image, PageVersion { page_image: Some(im), .. } if im[0] == 3));
let mut it = ph.iter();
assert_eq!(it.next().unwrap().0, 10.into());
assert_eq!(it.next().unwrap().0, 20.into());
assert_eq!(it.next().unwrap().0, 30.into());
assert!(it.next().is_none());
}
}

View File

@@ -37,6 +37,7 @@
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
//!
use crate::layered_repository::page_history::PageHistory;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageVersion, SegmentTag,
};
@@ -236,7 +237,7 @@ pub struct SnapshotLayerInner {
/// All versions of all pages in the file are are kept here.
/// Indexed by block number and LSN.
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
pages: BTreeMap<u32, PageHistory>,
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: BTreeMap<Lsn, u32>,
@@ -270,6 +271,7 @@ impl Layer for SnapshotLayer {
lsn: Lsn,
reconstruct_data: &mut PageReconstructData,
) -> Result<Option<Lsn>> {
/*
// Scan the BTreeMap backwards, starting from the given entry.
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
{
@@ -303,6 +305,9 @@ impl Layer for SnapshotLayer {
}
Ok(need_base_image_lsn)
*/
todo!()
}
/// Get size of the relation at given LSN
@@ -380,7 +385,7 @@ impl SnapshotLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
pages: BTreeMap<u32, PageHistory>,
relsizes: BTreeMap<Lsn, u32>,
) -> Result<SnapshotLayer> {
let snapfile = SnapshotLayer {
@@ -393,10 +398,12 @@ impl SnapshotLayer {
dropped,
inner: Mutex::new(SnapshotLayerInner {
loaded: true,
page_versions: page_versions,
relsizes: relsizes,
pages,
relsizes,
}),
};
/*
let inner = snapfile.inner.lock().unwrap();
// Write the in-memory btreemaps into a file
@@ -426,12 +433,16 @@ impl SnapshotLayer {
drop(inner);
Ok(snapfile)
*/
todo!()
}
///
/// Load the contents of the file into memory
///
fn load(&self) -> Result<MutexGuard<SnapshotLayerInner>> {
/*
// quick exit if already loaded
let mut inner = self.inner.lock().unwrap();
@@ -469,6 +480,9 @@ impl SnapshotLayer {
};
Ok(inner)
*/
todo!()
}
/// Create SnapshotLayers representing all files on disk
@@ -479,6 +493,7 @@ impl SnapshotLayer {
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> Result<Vec<Arc<SnapshotLayer>>> {
/*
let path = conf.timeline_path(&timelineid, &tenantid);
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
@@ -506,6 +521,8 @@ impl SnapshotLayer {
}
}
return Ok(snapfiles);
*/
todo!()
}
pub fn delete(&self) -> Result<()> {
@@ -519,11 +536,14 @@ impl SnapshotLayer {
/// it will need to be loaded back.
///
pub fn unload(&self) -> Result<()> {
/*
let mut inner = self.inner.lock().unwrap();
inner.page_versions = BTreeMap::new();
inner.relsizes = BTreeMap::new();
inner.loaded = false;
Ok(())
*/
todo!()
}
/// debugging function to print out the contents of the layer

View File

@@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline {
assert!(old <= lsn);
// Use old value of last_record_lsn as prev_record_lsn
self.prev_record_lsn.fetch_max(old.align());
self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7));
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
@@ -712,41 +712,6 @@ impl Timeline for ObjectTimeline {
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory { lsn, iter }))
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, req_lsn: Lsn) -> Result<Lsn> {
let mut lsn = req_lsn;
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
}
impl ObjectTimeline {
@@ -855,6 +820,40 @@ impl ObjectTimeline {
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
///
/// Iterate through object versions with given key, in reverse LSN order.
///

View File

@@ -357,13 +357,8 @@ impl PageServerHandler {
/* Send a tarball of the latest snapshot on the timeline */
let req_lsn = match lsn {
Some(lsn) => {
timeline.wait_lsn(lsn)?;
lsn
}
None => timeline.get_last_record_lsn(),
};
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
{
let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new(
@@ -474,7 +469,7 @@ impl postgres_backend::Handler for PageServerHandler {
let (_, params_raw) = query_string.split_at("basebackup ".len());
let params = params_raw.split(" ").collect::<Vec<_>>();
ensure!(
params.len() >= 2,
params.len() == 2,
"invalid param number for basebackup command"
);
@@ -484,7 +479,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.check_permission(Some(tenantid))?;
// TODO are there any tests with lsn option?
let lsn = if params.len() == 3 && params[2].len() != 0 {
let lsn = if params.len() == 3 {
Some(Lsn::from_str(params[2])?)
} else {
None
@@ -580,10 +575,6 @@ impl postgres_backend::Handler for PageServerHandler {
timeline.advance_last_valid_lsn(last_lsn);
break;
}
FeMessage::CopyFailed => {
info!("Copy failed");
break;
}
FeMessage::Sync => {}
_ => bail!("unexpected message {:?}", msg),
}

View File

@@ -203,11 +203,6 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
}
pub trait History: Iterator<Item = Result<Modification>> {

View File

@@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
/// Scan PostgreSQL WAL files in given directory
/// and load all records >= 'startpoint' into the repository.
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
@@ -425,11 +425,9 @@ pub fn save_decoded_record(
let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
save_xact_record(timeline, lsn, &parsed_xact, decoded)?;
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
info!(
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid,
parsed_xact.xid,
lsn
decoded.xl_xid, parsed_xact.xid, lsn
);
timeline.put_unlink(
RelishTag::TwoPhase {

View File

@@ -25,13 +25,13 @@ pub type MultiXactStatus = u32;
pub struct WalStreamDecoder {
lsn: Lsn,
startlsn: Lsn, // LSN where this record starts
contlen: u32,
padlen: u32,
inputbuf: BytesMut,
recordbuf: BytesMut,
crc_check: bool,
recordbuf: BytesMut,
}
#[derive(Error, Debug, Clone)]
@@ -46,24 +46,19 @@ pub struct WalDecodeError {
// FIXME: This isn't a proper rust stream
//
impl WalStreamDecoder {
pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
startlsn: Lsn(0),
contlen: 0,
padlen: 0,
inputbuf: BytesMut::new(),
recordbuf: BytesMut::new(),
crc_check,
}
}
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
@@ -97,9 +92,7 @@ impl WalStreamDecoder {
// TODO: verify the remaining fields in the header
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
if !self.crc_check && self.contlen != hdr.std.xlp_rem_len {
self.contlen = hdr.std.xlp_rem_len; // skip continuation record
}
continue;
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
return Ok(None);
@@ -109,19 +102,14 @@ impl WalStreamDecoder {
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: format!(
"invalid xlog page header: xlp_pageaddr={} vs. lsn={}",
hdr.xlp_pageaddr, self.lsn
),
msg: "invalid xlog page header".into(),
lsn: self.lsn,
});
}
// TODO: verify the remaining fields in the header
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
if !self.crc_check && self.contlen != hdr.xlp_rem_len {
self.contlen = hdr.xlp_rem_len; // skip continuation record
}
continue;
} else if self.padlen > 0 {
if self.inputbuf.remaining() < self.padlen as usize {
return Ok(None);
@@ -139,6 +127,7 @@ impl WalStreamDecoder {
}
// read xl_tot_len FIXME: assumes little-endian
self.startlsn = self.lsn;
let xl_tot_len = self.inputbuf.get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
@@ -153,6 +142,7 @@ impl WalStreamDecoder {
self.recordbuf.put_u32_le(xl_tot_len);
self.contlen = xl_tot_len - 4;
continue;
} else {
// we're continuing a record, possibly from previous page.
let pageleft = self.lsn.remaining_in_block() as u32;
@@ -174,10 +164,17 @@ impl WalStreamDecoder {
let recordbuf = recordbuf.freeze();
let mut buf = recordbuf.clone();
let xlogrec = XLogRecord::from_bytes(&mut buf);
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
let xlogrec = XLogRecord::from_bytes(&mut buf);
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,
});
}
if xlogrec.is_xlog_switch_record() {
trace!("saw xlog switch record at {}", self.lsn);
self.padlen =
@@ -187,29 +184,10 @@ impl WalStreamDecoder {
self.padlen = self.lsn.calc_padding(8u32) as u32;
}
// Check record CRC
if self.crc_check {
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
return Err(WalDecodeError {
msg: format!(
"WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}",
n,
buf.len(),
self.lsn,
xlogrec
),
lsn: self.lsn,
});
}
}
let result = (self.lsn.align(), recordbuf);
let result = (self.lsn, recordbuf);
return Ok(Some(result));
}
continue;
}
}
// check record boundaries

View File

@@ -22,6 +22,8 @@ use postgres_types::PgLsn;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
@@ -176,7 +178,7 @@ fn walreceiver_main(
let copy_stream = rclient.copy_both_simple(&query)?;
let mut physical_stream = ReplicationIter::new(copy_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
@@ -192,51 +194,45 @@ fn walreceiver_main(
let endlsn = startlsn + data.len() as u64;
let prev_last_rec_lsn = last_rec_lsn;
write_wal_file(
conf,
startlsn,
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
data,
tenantid,
)?;
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);
loop {
match waldecoder.poll_decode() {
Ok(Some((lsn, recdata))) => {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
&decoded,
recdata,
lsn,
)?;
last_rec_lsn = lsn;
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
&decoded,
recdata,
lsn,
)?;
last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
RelishTag::Checkpoint,
0,
lsn,
new_checkpoint_bytes,
false,
)?;
}
}
Ok(None) => {
trace!(
"End of replication stream {}..{} at {}",
startlsn,
endlsn,
last_rec_lsn
);
break;
}
Err(e) => {
info!("Decode error {}", e);
return Err(e.into());
}
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
RelishTag::Checkpoint,
0,
lsn,
new_checkpoint_bytes,
false,
)?;
}
}
// Update the last_valid LSN value in the page cache one more time. We updated
// it in the loop above, between each WAL record, but we might have received
// a partial record after the last completed record. Our page cache's value
@@ -411,3 +407,98 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
Err(IdentifyError.into())
}
}
fn write_wal_file(
conf: &PageServerConf,
startpos: Lsn,
timelineid: &ZTimelineId,
wal_seg_size: usize,
buf: &[u8],
tenantid: &ZTenantId,
) -> anyhow::Result<()> {
let mut bytes_left: usize = buf.len();
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
while bytes_left != 0 {
let bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach wal
* segment size.
*/
if xlogoff + bytes_left > wal_seg_size {
bytes_to_write = wal_seg_size - xlogoff;
} else {
bytes_to_write = bytes_left;
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
wal_seg_size,
);
let wal_file_path = wal_dir.join(wal_file_name.clone());
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
/* Try to open existed partial file */
wal_file = file;
partial = true;
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(&ZERO_BLOCK)?;
}
wal_file = file;
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
}
}
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// FIXME: Flush the file
//wal_file.sync_all()?;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
start_pos += bytes_to_write as u64;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
}
}
}
Ok(())
}

View File

@@ -108,23 +108,17 @@ fn find_end_of_wal_segment(
segno: XLogSegNo,
tli: TimeLineID,
wal_seg_size: usize,
is_partial: bool,
rec_offs: &mut usize,
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
crc: &mut u32,
check_contrec: bool,
) -> u32 {
let mut offs: usize = 0;
let mut contlen: usize = 0;
let mut wal_crc: u32 = 0;
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut buf = [0u8; XLOG_BLCKSZ];
let file_name = XLogFileName(tli, segno, wal_seg_size);
let mut last_valid_rec_pos: usize = 0;
let file_path = data_dir.join(if is_partial {
file_name.clone() + ".partial"
} else {
file_name
});
let mut file = File::open(&file_path).unwrap();
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
while offs < wal_seg_size {
if offs % XLOG_BLCKSZ == 0 {
@@ -139,33 +133,13 @@ fn find_end_of_wal_segment(
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic);
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
break;
}
if offs == 0 {
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
if check_contrec {
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
contlen = xlp_rem_len as usize;
if *rec_offs + contlen < xl_tot_len
|| (*rec_offs + contlen != xl_tot_len
&& contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD)
{
info!(
"Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}",
*rec_offs, contlen, xl_tot_len
);
return 0;
}
} else {
offs += ((xlp_rem_len + 7) & !7) as usize;
}
} else if *rec_offs != 0 {
// There is incompleted page at previous segment but no cont record:
// it means that current segment is not valid and we have to return back.
info!("CONTRECORD flag is missed in page header");
return 0;
offs += ((xlp_rem_len + 7) & !7) as usize;
}
} else {
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
@@ -176,8 +150,9 @@ fn find_end_of_wal_segment(
if xl_tot_len == 0 {
break;
}
last_valid_rec_pos = offs;
offs += 4;
*rec_offs = 4;
rec_offs = 4;
contlen = xl_tot_len - 4;
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
} else {
@@ -187,33 +162,34 @@ fn find_end_of_wal_segment(
// read the rest of the record, or as much as fits on this page.
let n = min(contlen, pageleft);
let mut hdr_len: usize = 0;
if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
// copy header
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
rec_hdr[*rec_offs..*rec_offs + hdr_len]
.copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
if rec_offs < XLOG_RECORD_CRC_OFFS {
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
}
*crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]);
*rec_offs += n;
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
crc = !crc;
} else {
crc ^= 0xFFFFFFFFu32;
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
crc = !crc;
}
rec_offs += n;
offs += n;
contlen -= n;
if contlen == 0 {
*crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
crc = !crc;
crc = crc32c_append(crc, &rec_hdr);
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
let wal_crc = LittleEndian::read_u32(
&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4],
);
if *crc == wal_crc {
if crc == wal_crc {
last_valid_rec_pos = offs;
// Reset rec_offs and crc for start of new record
*rec_offs = 0;
*crc = 0;
} else {
info!(
"CRC mismatch {} vs {} at offset {} lsn {}",
*crc, wal_crc, offs, last_valid_rec_pos
"CRC mismatch {} vs {} at {}",
crc, wal_crc, last_valid_rec_pos
);
break;
}
@@ -264,142 +240,20 @@ pub fn find_end_of_wal(
}
if high_segno > 0 {
let mut high_offs = 0;
if precise {
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
let wal_dir = data_dir.join("pg_wal");
/*
* To be able to calculate CRC of records crossing segment boundary,
* we need to parse previous segments.
* So first traverse segments in backward direction to locate record start
* and then traverse forward, accumulating CRC.
*/
let mut prev_segno = high_segno - 1;
let mut prev_offs: u32 = 0;
while prev_segno > 1 {
// TOFO: first segment constains dummy checkpoint record at the beginning
prev_offs = find_end_of_wal_segment(
data_dir,
prev_segno,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
if prev_offs != 0 {
break;
}
prev_segno -= 1;
}
if prev_offs != 0 {
// found start of WAL record
let first_segno = prev_segno;
let first_offs = prev_offs;
while prev_segno + 1 < high_segno {
// now traverse record in forward direction, accumulating CRC
prev_segno += 1;
prev_offs = find_end_of_wal_segment(
data_dir,
prev_segno,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
if prev_offs == 0 {
info!("Segment {} is corrupted", prev_segno,);
break;
}
}
if prev_offs != 0 {
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
}
if high_offs == 0 {
// If last segment contais no valid records, then return back
info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment",
high_segno, first_segno);
// Remove last segments containing corrupted WAL record
for segno in first_segno + 1..high_segno {
let file_name = XLogFileName(high_tli, segno, wal_seg_size);
let file_path = wal_dir.join(file_name);
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
}
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
let file_path = if high_ispartial {
wal_dir.join(file_name.clone() + ".partial")
} else {
wal_dir.join(file_name.clone())
};
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
high_ispartial = false; // previous segment should not be partial
high_segno = first_segno;
high_offs = first_offs;
}
} else {
// failed to locate previous segment
assert!(prev_segno <= 1);
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
}
// If last segment is not marked as partial, it means that next segment
// was not written. Let's make this segment partial once again.
if !high_ispartial {
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
if let Err(e) = fs::rename(
wal_dir.join(file_name.clone()),
wal_dir.join(file_name.clone() + ".partial"),
) {
info!(
"Failed to rename {} to {}.partial: {}",
&file_name, &file_name, e
);
}
}
} else {
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
if !high_ispartial {
high_segno += 1;
}
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
if !high_ispartial {
high_segno += 1;
} else if precise {
/* otherwise locate last record in last partial segment */
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size);
}
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
return (high_ptr, high_tli);
}
(0, 1) // First timeline is 1
(0, 0)
}
pub fn main() {
@@ -615,7 +469,7 @@ mod tests {
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true);
let wal_end = Lsn(wal_end);
println!("wal_end={}, tli={}", wal_end, tli);
assert_eq!(wal_end, "0/1699D10".parse::<Lsn>().unwrap());
assert_eq!(wal_end, "0/2000000".parse::<Lsn>().unwrap());
// 4. Get the actual end of WAL by pg_waldump
let waldump_path = top_path.join("tmp_install/bin/pg_waldump");

21
snapfile/Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[package]
name = "snapfile"
version = "0.1.0"
edition = "2018"
[[bin]]
name = "snaptool"
path = "snaptool/main.rs"
[dependencies]
anyhow = "1.0"
aversion = "0.2"
bookfile = "0.3"
serde = { version = "1.0", features = ["derive"] }
rand = "0.8.3"
structopt = "0.3"
zenith_utils = { path = "../zenith_utils" }
hex = "0.4.3"
[dev-dependencies]
tempfile = "3.2"

64
snapfile/snaptool/main.rs Normal file
View File

@@ -0,0 +1,64 @@
use anyhow::{Context, Result};
use snapfile::{squash, SnapFile};
use std::env::current_dir;
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(StructOpt)]
#[structopt(about = "A tool for manipulating snapshot files")]
enum Params {
Squash(Squash),
Describe(Describe),
}
#[derive(StructOpt)]
struct Squash {
older: PathBuf,
newer: PathBuf,
}
#[derive(StructOpt)]
struct Describe {
file: PathBuf,
}
fn print_errors(error: anyhow::Error) {
let formatted: Vec<_> = error.chain().map(ToString::to_string).collect();
eprintln!("{}", formatted.join(": "));
}
fn main() {
let res = snaptool_main();
if let Err(e) = res {
print_errors(e);
}
}
fn snaptool_main() -> Result<()> {
let params = Params::from_args();
match &params {
Params::Squash(squash_params) => {
let out_dir = current_dir()?;
squash(&squash_params.older, &squash_params.newer, &out_dir).with_context(|| {
format!(
"squash {} {}",
squash_params.older.to_string_lossy(),
squash_params.newer.to_string_lossy()
)
})?;
}
Params::Describe(describe_params) => {
describe(describe_params)
.with_context(|| format!("describe {}", describe_params.file.to_string_lossy()))?;
}
}
Ok(())
}
fn describe(params: &Describe) -> Result<()> {
let mut snap = SnapFile::new(&params.file)?;
let meta = snap.read_meta()?;
println!("{:?}: {:#?}", params.file, meta);
Ok(())
}

339
snapfile/src/lib.rs Normal file
View File

@@ -0,0 +1,339 @@
//! A file format for storage a snapshot of pages.
#![warn(missing_docs)]
#![forbid(unsafe_code)]
#![warn(clippy::cast_possible_truncation)]
mod page;
mod squash;
mod versioned;
#[doc(inline)]
pub use page::Page;
#[doc(inline)]
pub use squash::squash;
use anyhow::{bail, Context, Result};
use aversion::group::{DataSink, DataSourceExt};
use aversion::util::cbor::CborData;
use bookfile::{Book, BookWriter, ChapterWriter};
use std::ffi::OsString;
use std::fs::File;
use std::io::Write;
use std::ops::AddAssign;
use std::path::{Path, PathBuf};
pub use versioned::{PageIndex, PageLocation, Predecessor, SnapFileMeta};
use zenith_utils::lsn::Lsn;
impl SnapFileMeta {
pub fn new(previous: Option<SnapFileMeta>, timeline: [u8; 16], lsn: Lsn) -> Self {
// Store the metadata of the predecessor snapshot, if there is one.
let predecessor = previous.map(|prev| Predecessor {
timeline: prev.timeline,
lsn: prev.lsn,
});
SnapFileMeta {
timeline,
predecessor,
lsn: lsn.into(),
}
}
fn to_filename(&self) -> OsString {
let timeline_string = hex::encode(self.timeline);
let pred_lsn = match &self.predecessor {
None => 0,
Some(pred) => pred.lsn,
};
format!("{}_{:x}_{:x}.zdb", timeline_string, pred_lsn, self.lsn).into()
}
}
impl PageIndex {
/// Retrieve the page offset from the index.
///
/// If the page is not in the index, returns `None`.
fn get_page_location(&self, page_num: u64) -> Option<PageLocation> {
self.map.get(&page_num).copied()
}
fn page_count(&self) -> usize {
self.map.len()
}
}
impl PageLocation {
fn to_offset(&self) -> u64 {
// Counts in units of one page.
self.0 * 8192
}
}
impl AddAssign<u64> for PageLocation {
fn add_assign(&mut self, rhs: u64) {
self.0 += rhs;
}
}
/// A read-only snapshot file.
pub struct SnapFile {
book: Book<File>,
page_index: PageIndex,
}
impl SnapFile {
/// Open a new `SnapFile` for reading.
///
/// This call will validate some of the file's format and read the file's
/// metadata; it may return an error if the file format is invalid.
pub fn new(path: &Path) -> Result<Self> {
let file =
File::open(path).with_context(|| format!("snapfile {}", path.to_string_lossy()))?;
let book = Book::new(file)?;
if book.magic() != versioned::SNAPFILE_MAGIC {
bail!("bad magic number");
}
// Read the page index into memory.
let chapter_reader = book
.chapter_reader(versioned::CHAPTER_PAGE_INDEX)
.context("snapfile missing index chapter")?;
let mut source = CborData::new(chapter_reader);
let page_index: PageIndex = source.expect_message()?;
Ok(SnapFile { book, page_index })
}
/// Read the snapshot metadata.
pub fn read_meta(&mut self) -> Result<SnapFileMeta> {
let chapter_reader = self
.book
.chapter_reader(versioned::CHAPTER_SNAP_META)
.context("snapfile missing meta")?;
let mut source = CborData::new(chapter_reader);
let meta: SnapFileMeta = source.expect_message()?;
Ok(meta)
}
/// Return the number of pages stored in this snapshot.
pub fn page_count(&self) -> usize {
self.page_index.page_count()
}
/// Check if a page exists in this snapshot's index.
///
/// Returns `true` if the given page is stored in this snapshot file,
/// `false` if not.
pub fn has_page(&self, page_num: u64) -> bool {
self.page_index.get_page_location(page_num).is_some()
}
/// Read a page.
///
/// If this returns Ok(None), that means that this file does not store
/// the requested page.
/// This should only fail (returning `Err`) if an IO error occurs.
pub fn read_page(&self, page_num: u64) -> Result<Option<Page>> {
match self.page_index.get_page_location(page_num) {
None => Ok(None),
Some(page_offset) => Ok(Some(self._read_page(page_offset)?)),
}
}
/// Read page data from the file.
///
/// This does the work for read_page and PageIter.
fn _read_page(&self, page_location: PageLocation) -> Result<Page> {
// Compute the true byte offset in the file.
let page_offset = page_location.to_offset();
let chapter_reader = self
.book
.chapter_reader(versioned::CHAPTER_PAGES)
.context("snapfile missing pages chapter")?;
let mut page_data = Page::default();
let bytes_read = chapter_reader.read_at(page_data.as_mut(), page_offset)?;
if bytes_read != 8192 {
bail!("read truncated page");
}
Ok(page_data)
}
/// Iterate over pages.
///
/// This will return an iterator over (usize, )
pub fn all_pages(&self) -> PageIter {
let inner = (&self.page_index.map).into_iter();
PageIter {
snapfile: self,
inner,
}
}
}
/// An iterator over all pages in the snapshot file.
pub struct PageIter<'a> {
snapfile: &'a SnapFile,
inner: std::collections::btree_map::Iter<'a, u64, PageLocation>,
}
impl Iterator for PageIter<'_> {
type Item = Result<(u64, Page)>;
fn next(&mut self) -> Option<Self::Item> {
let (page_num, page_offset) = self.inner.next()?;
let result = self
.snapfile
._read_page(*page_offset)
.map(|page_data| (*page_num, page_data));
Some(result)
}
}
/// `SnapWriter` creates a new snapshot file.
///
/// A SnapWriter is created, has pages written into it, and is then closed.
pub struct SnapWriter {
writer: ChapterWriter<File>,
page_index: PageIndex,
meta: SnapFileMeta,
current_offset: PageLocation,
}
impl SnapWriter {
/// Create a new `SnapWriter`.
///
pub fn new(dir: &Path, meta: SnapFileMeta) -> Result<Self> {
let mut path = PathBuf::from(dir);
path.push(meta.to_filename());
let file = File::create(path)?;
let book = BookWriter::new(file, versioned::SNAPFILE_MAGIC)?;
// Write a chapter for the snapshot metadata.
let writer = book.new_chapter(versioned::CHAPTER_SNAP_META);
let mut sink = CborData::new(writer);
sink.write_message(&meta)?;
let book = sink.into_inner().close()?;
// Open a new chapter for raw page data.
let writer = book.new_chapter(versioned::CHAPTER_PAGES);
Ok(SnapWriter {
writer,
page_index: PageIndex::default(),
meta,
current_offset: PageLocation::default(),
})
}
/// Write a page into the snap file.
pub fn write_page<P>(&mut self, page_num: u64, page_data: P) -> Result<()>
where
P: Into<Page>,
{
let page_data: Page = page_data.into();
self.writer.write_all(page_data.as_ref())?;
let prev = self.page_index.map.insert(page_num, self.current_offset);
if prev.is_some() {
panic!("duplicate index for page {}", page_num);
}
self.current_offset += 1;
Ok(())
}
/// Finish writing pages.
///
/// This consumes the PagesWriter and completes the snapshot.
//
pub fn finish(self) -> Result<SnapFileMeta> {
let book = self.writer.close()?;
// Write out a page index and close the book. This will write out any
// necessary file metadata.
// FIXME: these 3 lines could be combined into a single function
// that means "serialize this data structure with this format into this chapter".
let writer = book.new_chapter(versioned::CHAPTER_PAGE_INDEX);
let mut sink = CborData::new(writer);
sink.write_message(&self.page_index)?;
// Close the chapter, then close the book.
sink.into_inner().close()?.close()?;
// Return the snapshot metadata to the caller.
Ok(self.meta)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use tempfile::TempDir;
const TEST_TIMELINE: [u8; 16] = [99u8; 16];
#[test]
fn snap_two_pages() {
// When `dir` goes out of scope the directory will be unlinked.
let dir = TempDir::new().unwrap();
let snap_meta = {
// Write out a new snapshot file with two pages.
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
let mut snap = SnapWriter::new(dir.path(), meta).unwrap();
// Write the pages out of order, because why not?
let page99 = [99u8; 8192];
snap.write_page(99, page99).unwrap();
let page33 = [33u8; 8192];
snap.write_page(33, page33).unwrap();
snap.finish().unwrap()
};
assert_eq!(snap_meta.lsn, 1234);
{
// Read the snapshot file and verify the contents.
let mut path = PathBuf::from(dir.path());
path.push(snap_meta.to_filename());
let mut snap = SnapFile::new(&path).unwrap();
assert_eq!(snap.page_count(), 2);
assert!(!snap.has_page(0));
assert!(snap.has_page(33));
assert!(!snap.has_page(98));
assert!(snap.has_page(99));
assert!(snap.read_page(0).unwrap().is_none());
let page = snap.read_page(33).unwrap().unwrap();
assert_eq!(*page.0, [33u8; 8192]);
let page = snap.read_page(99).unwrap().unwrap();
assert_eq!(*page.0, [99u8; 8192]);
// Make sure the deserialized metadata matches what we think we wrote.
let meta2 = snap.read_meta().unwrap();
assert_eq!(snap_meta, meta2);
}
}
#[test]
fn snap_zero_pages() {
// When `dir` goes out of scope the directory will be unlinked.
let dir = TempDir::new().unwrap();
let snap_meta = {
// Write out a new snapshot file with no pages.
let meta = SnapFileMeta::new(None, TEST_TIMELINE, Lsn(1234));
let snap = SnapWriter::new(dir.path(), meta).unwrap();
snap.finish().unwrap()
};
{
// Read the snapshot file.
let mut path = PathBuf::from(dir.path());
path.push(snap_meta.to_filename());
let snap = SnapFile::new(&path).unwrap();
assert_eq!(snap.page_index.page_count(), 0);
assert!(!snap.has_page(0));
assert!(!snap.has_page(99));
assert!(snap.read_page(0).unwrap().is_none());
assert!(snap.read_page(99).unwrap().is_none());
}
}
}

42
snapfile/src/page.rs Normal file
View File

@@ -0,0 +1,42 @@
/// A single 8KB page.
pub struct Page(pub Box<[u8; 8192]>);
impl Page {
/// Create a page by copying bytes from another slice.
///
/// This is a copy, not a move. If the caller already has
/// an owned array then `From<[u8; 8192]>` can be used instead.
pub fn copy_slice(x: &[u8; 8192]) -> Self {
Page(Box::new(x.clone()))
}
}
impl Default for Page {
fn default() -> Self {
Page(Box::new([0u8; 8192]))
}
}
impl From<[u8; 8192]> for Page {
fn from(array: [u8; 8192]) -> Self {
Page(Box::new(array))
}
}
impl From<Box<[u8; 8192]>> for Page {
fn from(heap_array: Box<[u8; 8192]>) -> Self {
Page(heap_array)
}
}
impl AsRef<[u8; 8192]> for Page {
fn as_ref(&self) -> &[u8; 8192] {
self.0.as_ref()
}
}
impl AsMut<[u8; 8192]> for Page {
fn as_mut(&mut self) -> &mut [u8; 8192] {
self.0.as_mut()
}
}

100
snapfile/src/squash.rs Normal file
View File

@@ -0,0 +1,100 @@
use crate::{Page, PageIter, SnapFile, SnapFileMeta, SnapWriter};
use anyhow::{bail, Result};
use std::cmp::Ordering;
use std::path::Path;
// A helper struct that holds an iterator, along with the last
// value taken from the iterator.
struct PageStepper<'a> {
it: PageIter<'a>,
pub cache: Option<(u64, Page)>,
}
impl<'a> PageStepper<'a> {
fn new(snapfile: &'a SnapFile) -> Result<Self> {
let mut it = snapfile.all_pages();
let cache = it.next().transpose()?;
Ok(PageStepper { it, cache })
}
/// Read a new page from the iterator, returning the previous page.
fn step(&mut self) -> Result<Option<(u64, Page)>> {
let mut next = self.it.next().transpose()?;
std::mem::swap(&mut self.cache, &mut next);
Ok(next)
}
}
/// Squash two snapshot files into one.
///
/// The resulting snapshot will contain all of the pages from both files.
/// If the same page number is stored in both, it will keep the page from
/// the newer snapshot.
///
/// The name of the resulting file will be automatically generated from
/// the snapshot metadata.
pub fn squash(older: &Path, newer: &Path, out_dir: &Path) -> Result<()> {
let mut snap1 = SnapFile::new(older)?;
let mut snap2 = SnapFile::new(newer)?;
let meta1 = snap1.read_meta()?;
let meta2 = snap2.read_meta()?;
// Check that snap1 is the predecessor of snap2.
match meta2.predecessor {
Some(pred) if pred.timeline == meta1.timeline => {}
_ => {
bail!(
"snap file {:?} is not the predecessor of {:?}",
&older,
&newer,
);
}
}
// The new combined snapshot will have most fields from meta2 (the later
// snapshot), but will have the predecessor from meta1.
let new_meta = SnapFileMeta {
// There is some danger in squashing snapshots across two timelines,
// in that it's possible to get confused about what the history
// looks like. Ultimately, it should be possible to squash our way
// to a "complete" snapshot (that contains all pages), so this must
// be possible.
timeline: meta2.timeline,
predecessor: meta1.predecessor,
lsn: meta2.lsn,
};
let mut snap_writer = SnapWriter::new(&out_dir, new_meta)?;
let mut iter1 = PageStepper::new(&snap1)?;
let mut iter2 = PageStepper::new(&snap2)?;
loop {
let next_page = match (&iter1.cache, &iter2.cache) {
(None, None) => break,
(Some(_), None) => iter1.step()?,
(None, Some(_)) => iter2.step()?,
(Some(x), Some(y)) => {
// If these are two different page numbers, then advance the iterator
// with the numerically lower number.
// If they are the same page number, then store the one from the newer
// snapshot, and discard the other (advancing both iterators).
match x.0.cmp(&y.0) {
Ordering::Less => iter1.step()?,
Ordering::Greater => iter2.step()?,
Ordering::Equal => {
let _ = iter1.step()?;
iter2.step()?
}
}
}
};
// This can't be None, because we would already checked inside the match
// statement.
let (page_num, page_data) = next_page.unwrap();
snap_writer.write_page(page_num, page_data)?;
}
snap_writer.finish()?;
Ok(())
}

88
snapfile/src/versioned.rs Normal file
View File

@@ -0,0 +1,88 @@
//! Versioned data structures for snapshot files
//!
//! To ensure that future versions of software can read snapshot files,
//! all data structures that are serialized into the snapshot files should
//! live in this module.
//!
//! Once released, versioned data structures should never be modified.
//! Instead, new versions should be created and conversion functions should
//! be defined using the `FromVersion` trait.
use aversion::{assign_message_ids, UpgradeLatest, Versioned};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
// A random constant, to identify this file type.
pub(crate) const SNAPFILE_MAGIC: u32 = 0x7fb8_38a8;
// Constant chapter numbers
// FIXME: the bookfile crate should use something better to index, e.g. strings.
/// Snapshot-specific file metadata
pub(crate) const CHAPTER_SNAP_META: u64 = 1;
/// A packed set of 8KB pages.
pub(crate) const CHAPTER_PAGES: u64 = 2;
/// An index of pages.
pub(crate) const CHAPTER_PAGE_INDEX: u64 = 3;
/// Information about the predecessor snapshot.
///
/// It contains the snap_id of the predecessor snapshot, and the LSN
/// of that snapshot.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Predecessor {
/// This is the ID number of the predecessor timeline.
///
/// This may match the current snapshot's timeline id, but
/// it may not (if the precessor was the branch point).
pub timeline: [u8; 16],
/// This is the LSN of the predecessor snapshot.
pub lsn: u64,
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Versioned, UpgradeLatest)]
pub struct SnapFileMetaV1 {
/// This is a unique ID number for this timeline.
///
/// This number guarantees that snapshot history is unique.
pub timeline: [u8; 16],
/// Information about the predecessor snapshot.
///
/// If `None`, this snapshot is the start of a new database.
pub predecessor: Option<Predecessor>,
/// This is the last LSN stored in this snapshot.
pub lsn: u64,
}
/// A type alias for the latest version of `SnapFileMeta`.
pub type SnapFileMeta = SnapFileMetaV1;
/// A page location within a file.
///
/// Note: this is an opaque value that may not be the true byte offset;
/// it may be relative to some other location or measured in units other
/// than bytes.
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(transparent)]
pub struct PageLocationV1(pub(crate) u64);
/// A type alias for the latest version of `PageLocation`.
pub type PageLocation = PageLocationV1;
/// An index from page number to offset within the pages chapter.
#[derive(Debug, Default, Serialize, Deserialize, Versioned, UpgradeLatest)]
pub struct PageIndexV1 {
/// A map from page number to file offset.
pub(crate) map: BTreeMap<u64, PageLocationV1>,
}
/// A type alias for the latest version of `PageIndex`.
pub type PageIndex = PageIndexV1;
// Each message gets a unique message id, for tracking by the aversion traits.
assign_message_ids! {
PageIndex: 100,
SnapFileMeta: 101,
}

View File

@@ -1,66 +0,0 @@
import pytest
import random
import time
from contextlib import closing
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory
pytest_plugins = ("fixtures.zenith_fixtures")
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
# times, with fault_probability chance of getting a wal acceptor down or up
# along the way. 2 of 3 are always alive, so the work keeps going.
def test_pageserver_restart(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory: WalAcceptorFactory):
# One safekeeper is enough for this test.
wa_factory.start_n_new(1)
zenith_cli.run(["branch", "test_pageserver_restart", "empty"])
pg = postgres.create_start('test_pageserver_restart',
wal_acceptors=wa_factory.get_connstrs())
pg_conn = pg.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute('CREATE TABLE foo (t text)')
cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
''')
# Verify that the table is larger than shared_buffers
cur.execute('''
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
from pg_settings where name = 'shared_buffers'
''')
row = cur.fetchone()
print("shared_buffers is {}, table size {}", row[0], row[1]);
assert int(row[0]) < int(row[1])
# Stop and restart pageserver. This is a more or less graceful shutdown, although
# the page server doesn't currently have a shutdown routine so there's no difference
# between stopping and crashing.
pageserver.stop();
pageserver.start();
# Stopping the pageserver breaks the connection from the postgres backend to
# the page server, and causes the next query on the connection to fail. Start a new
# postgres connection too, to avoid that error. (Ideally, the compute node would
# handle that and retry internally, without propagating the error to the user, but
# currently it doesn't...)
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000, )
# Stop the page server by force, and restart it
pageserver.stop();
pageserver.start();

View File

@@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_wal_acceptors', [True, False])
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_restart_compute(
zenith_cli,
pageserver: ZenithPageserver,
@@ -31,56 +31,31 @@ def test_restart_compute(
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
# Create table, and insert a row
cur.execute('CREATE TABLE foo (t text)')
cur.execute("INSERT INTO foo VALUES ('bar')")
# Remove data directory and restart
# Stop and restart the Postgres instance
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the row
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (1, )
# Insert another row
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
cur.execute('SELECT count(*) FROM t')
cur.execute("INSERT INTO foo VALUES ('bar2')")
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (2, )
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# Again remove data directory and restart
# Stop, and destroy the Postgres instance. Then recreate and restart it.
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (2, )

View File

@@ -1,5 +1,3 @@
import os
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
@@ -30,59 +28,24 @@ def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFa
cur.execute("INSERT INTO foo VALUES ('two')")
cur.execute("PREPARE TRANSACTION 'insert_two'")
# Prepare a transaction that will insert a row
cur.execute('BEGIN')
cur.execute("INSERT INTO foo VALUES ('three')")
cur.execute("PREPARE TRANSACTION 'insert_three'")
# Prepare another transaction that will insert a row
cur.execute('BEGIN')
cur.execute("INSERT INTO foo VALUES ('four')")
cur.execute("PREPARE TRANSACTION 'insert_four'")
# On checkpoint state data copied to files in
# pg_twophase directory and fsynced
cur.execute('CHECKPOINT')
twophase_files = os.listdir(pg.pg_twophase_dir_path())
print(twophase_files)
assert len(twophase_files) == 4
cur.execute("COMMIT PREPARED 'insert_three'")
cur.execute("ROLLBACK PREPARED 'insert_four'")
cur.execute('CHECKPOINT')
twophase_files = os.listdir(pg.pg_twophase_dir_path())
print(twophase_files)
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"])
# Create compute node, but don't start.
# We want to observe pgdata before postgres starts
pg2 = postgres.create(
pg2 = postgres.create_start(
'test_twophase_prepared',
config_lines=['max_prepared_transactions=5'],
)
# Check that we restored only needed twophase files
twophase_files2 = os.listdir(pg2.pg_twophase_dir_path())
print(twophase_files2)
assert twophase_files2.sort() == twophase_files.sort()
pg2 = pg2.start()
conn2 = pg2.connect()
cur2 = conn2.cursor()
# On the new branch, commit one of the prepared transactions,
# abort the other one.
# On the new branch, commit one of the prepared transactions, abort the other one.
cur2.execute("COMMIT PREPARED 'insert_one'")
cur2.execute("ROLLBACK PREPARED 'insert_two'")
cur2.execute('SELECT * FROM foo')
assert cur2.fetchall() == [('one',), ('three',)]
assert cur2.fetchall() == [('one', )]
# Only one committed insert is visible on the original branch
# Neither insert is visible on the original branch, the transactions are still
# in prepared state there.
cur.execute('SELECT * FROM foo')
assert cur.fetchall() == [('three',)]
assert cur.fetchall() == []

View File

@@ -249,68 +249,10 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
print('Starting pageserver cleanup')
ps.stop()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
super().__init__(host='localhost', port=port)
self.zenith_cli = zenith_cli
@@ -318,7 +260,6 @@ class Postgres(PgProtocol):
self.repo_dir = repo_dir
self.branch: Optional[str] = None # dubious, see asserts below
self.tenant_id = tenant_id
self.pg_bin = pg_bin
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
def create(
@@ -358,32 +299,20 @@ class Postgres(PgProtocol):
"""
assert self.branch is not None
print(f"Starting postgres on brach {self.branch}")
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
self.running = True
self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()])
return self
def pg_data_dir_path(self) -> str:
""" Path to data directory """
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
return os.path.join(self.repo_dir, path)
def pg_xact_dir_path(self) -> str:
""" Path to pg_xact dir """
return os.path.join(self.pg_data_dir_path(), 'pg_xact')
def pg_twophase_dir_path(self) -> str:
""" Path to pg_twophase dir """
return os.path.join(self.pg_data_dir_path(), 'pg_twophase')
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact'
return os.path.join(self.repo_dir, path)
def config_file_path(self) -> str:
""" Path to postgresql.conf """
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf'
return os.path.join(self.repo_dir, filename)
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
"""
@@ -475,14 +404,13 @@ class Postgres(PgProtocol):
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431):
self.zenith_cli = zenith_cli
self.repo_dir = repo_dir
self.num_instances = 0
self.instances: List[Postgres] = []
self.initial_tenant: str = initial_tenant
self.base_port = base_port
self.pg_bin = pg_bin
def create_start(
self,
@@ -495,7 +423,6 @@ class PostgresFactory:
pg = Postgres(
zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1,
)
@@ -569,8 +496,8 @@ def initial_tenant(pageserver: ZenithPageserver):
@zenfixture
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant)
yield pgfactory
@@ -578,6 +505,67 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin:
print('Starting postgres cleanup')
pgfactory.stop_all()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
def read_pid(path: Path):
""" Read content of file into number """
return int(path.read_text())

View File

@@ -2,7 +2,7 @@
//!
//! FIXME: better description needed here
use anyhow::{bail, Context, Result};
use anyhow::{bail, Result};
use bincode::config::Options;
use bytes::{Buf, Bytes};
use log::*;
@@ -27,7 +27,6 @@ use crate::replication::HotStandbyFeedback;
use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
@@ -237,9 +236,7 @@ impl<'pg> ReceiveWalConn<'pg> {
.write_message(&BeMessage::CopyBothResponse)?;
// Receive information about server
let server_info = self
.read_msg::<ServerInfo>()
.context("Failed to receive server info")?;
let server_info = self.read_msg::<ServerInfo>()?;
info!(
"Start handshake with wal_proposer {} sysid {} timeline {} tenant {}",
self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id,
@@ -287,9 +284,7 @@ impl<'pg> ReceiveWalConn<'pg> {
self.write_msg(&my_info)?;
/* Wait for vote request */
let prop = self
.read_msg::<RequestVote>()
.context("Failed to read vote request")?;
let prop = self.read_msg::<RequestVote>()?;
/* This is Paxos check which should ensure that only one master can perform commits */
if prop.node_id < my_info.server.node_id {
/* Send my node-id to inform proposer that it's candidate was rejected */
@@ -301,8 +296,8 @@ impl<'pg> ReceiveWalConn<'pg> {
);
}
my_info.server.node_id = prop.node_id;
/* Need to persist our vote first */
this_timeline.get().set_info(&my_info);
/* Need to persist our vote first */
this_timeline.get().save_control_file(true)?;
let mut flushed_restart_lsn = Lsn(0);
@@ -323,11 +318,9 @@ impl<'pg> ReceiveWalConn<'pg> {
}
info!(
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn,
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
);
let mut last_rec_lsn = Lsn(0);
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
// Main loop
loop {
@@ -337,8 +330,7 @@ impl<'pg> ReceiveWalConn<'pg> {
let msg_bytes = self.read_msg_bytes()?;
let mut msg_reader = msg_bytes.reader();
let req = SafeKeeperRequest::des_from(&mut msg_reader)
.context("Failed to get WAL message header")?;
let req = SafeKeeperRequest::des_from(&mut msg_reader)?;
if req.sender_id != my_info.server.node_id {
bail!("Sender NodeId is changed");
}
@@ -350,52 +342,27 @@ impl<'pg> ReceiveWalConn<'pg> {
let end_pos = req.end_lsn;
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
assert!(rec_size <= MAX_SEND_SIZE);
if rec_size != 0 {
debug!(
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
/* Receive message body (from the rest of the message) */
let mut buf = Vec::with_capacity(rec_size);
msg_reader.read_to_end(&mut buf)?;
assert_eq!(buf.len(), rec_size);
debug!(
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
if decoder.available() != start_pos {
info!(
"Restart decoder from {} to {}",
decoder.available(),
start_pos
);
decoder = WalStreamDecoder::new(start_pos, false);
}
decoder.feed_bytes(&buf);
loop {
match decoder.poll_decode() {
Err(e) => info!("Decode error {}", e),
Ok(None) => {},
Ok(Some((lsn, _rec))) => {
last_rec_lsn = lsn;
continue;
}
}
break;
}
info!(
"Receive WAL {}..{} last_rec_lsn={}",
start_pos, end_pos, last_rec_lsn
);
/* Receive message body (from the rest of the message) */
let mut buf = Vec::with_capacity(rec_size);
msg_reader.read_to_end(&mut buf)?;
assert_eq!(buf.len(), rec_size);
/* Save message in file */
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;
/* Save message in file */
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;
}
my_info.restart_lsn = req.restart_lsn;
my_info.commit_lsn = req.commit_lsn;
@@ -405,13 +372,13 @@ impl<'pg> ReceiveWalConn<'pg> {
* maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data.
*/
if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) {
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {
info!("Switch to new epoch {}", prop.epoch);
my_info.epoch = prop.epoch; /* bump epoch */
sync_control_file = true;
}
if last_rec_lsn > my_info.flush_lsn {
my_info.flush_lsn = last_rec_lsn;
if end_pos > my_info.flush_lsn {
my_info.flush_lsn = end_pos;
}
/*
* Update restart LSN in control file.
@@ -419,7 +386,6 @@ impl<'pg> ReceiveWalConn<'pg> {
* when restart_lsn delta exceeds WAL segment size.
*/
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
this_timeline.get().set_info(&my_info);
this_timeline.get().save_control_file(sync_control_file)?;
if sync_control_file {
@@ -430,7 +396,7 @@ impl<'pg> ReceiveWalConn<'pg> {
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
let resp = SafeKeeperResponse {
epoch: my_info.epoch,
flush_lsn: my_info.flush_lsn,
flush_lsn: end_pos,
hs_feedback: this_timeline.get().get_hs_feedback(),
};
self.write_msg(&resp)?;
@@ -439,15 +405,9 @@ impl<'pg> ReceiveWalConn<'pg> {
* Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/
trace!(
"Notify WAL senders min({}, {})={}",
req.commit_lsn,
my_info.flush_lsn,
min(req.commit_lsn, my_info.flush_lsn)
);
this_timeline
.get()
.notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn));
.notify_wal_senders(min(req.commit_lsn, end_pos));
}
Ok(())

View File

@@ -76,10 +76,8 @@ impl ReplicationConn {
let feedback = HotStandbyFeedback::des(&m)?;
subscriber.add_hs_feedback(feedback);
}
FeMessage::Sync => {}
FeMessage::CopyFailed => return Err(anyhow!("Copy failed")),
_ => {
// We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
// We only handle `CopyData` messages. Anything else is ignored.
info!("unexpected message {:?}", msg);
}
}
@@ -217,15 +215,10 @@ impl ReplicationConn {
data: &file_buf,
}))?;
debug!(
"Sent WAL to page server {}..{}, end_pos={}",
start_pos,
start_pos + send_size as u64,
end_pos
);
start_pos += send_size as u64;
debug!("Sent WAL to page server up to {}", end_pos);
// Decide whether to reuse this file. If we don't set wal_file here
// a new file will be opened next time.
if start_pos.segment_offset(wal_seg_size) != 0 {

View File

@@ -175,7 +175,7 @@ impl Timeline {
}
}
pub fn stop_wal_senders(&self) {
fn _stop_wal_senders(&self) {
self.notify_wal_senders(END_REPLICATION_MARKER);
}

View File

@@ -24,11 +24,6 @@ impl Lsn {
/// Maximum possible value for an LSN
pub const MAX: Lsn = Lsn(u64::MAX);
/// Align LSN on 8-byte boundary (alignment of WAL records).
pub fn align(&self) -> Lsn {
Lsn((self.0 + 7) & !7)
}
/// Subtract a number, returning None on overflow.
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
let other: u64 = other.into();

View File

@@ -301,9 +301,8 @@ impl PostgresBackend {
FeMessage::Query(m) => {
trace!("got query {:?}", m.body);
// xxx distinguish fatal and recoverable errors?
if let Err(e) = handler.process_query(self, m.body.clone()) {
if let Err(e) = handler.process_query(self, m.body) {
let errmsg = format!("{}", e);
warn!("query handler for {:?} failed: {}", m.body, errmsg);
self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?;
}
self.write_message(&BeMessage::ReadyForQuery)?;
@@ -341,7 +340,7 @@ impl PostgresBackend {
// We prefer explicit pattern matching to wildcards, because
// this helps us spot the places where new variants are missing
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
FeMessage::CopyData(_) | FeMessage::CopyDone => {
bail!("unexpected message type: {:?}", msg);
}
}

View File

@@ -31,7 +31,6 @@ pub enum FeMessage {
Terminate,
CopyData(Bytes),
CopyDone,
CopyFailed,
PasswordMessage(Bytes),
}
@@ -139,7 +138,6 @@ impl FeMessage {
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => Ok(Some(FeMessage::CopyData(body))),
b'c' => Ok(Some(FeMessage::CopyDone)),
b'f' => Ok(Some(FeMessage::CopyFailed)),
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
}
@@ -340,7 +338,6 @@ pub enum BeMessage<'a> {
ControlFile,
CopyData(&'a [u8]),
CopyDone,
CopyFailed,
CopyInResponse,
CopyOutResponse,
CopyBothResponse,
@@ -549,11 +546,6 @@ impl<'a> BeMessage<'a> {
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyFailed => {
buf.put_u8(b'f');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyInResponse => {
buf.put_u8(b'G');
write_body(buf, |buf| {