Compare commits

..

19 Commits

Author SHA1 Message Date
Thang Pham
e9766fb883 test 2022-07-14 14:34:11 -04:00
Thang Pham
57cfee1d67 test 2022-07-14 14:34:10 -04:00
Thang Pham
28e2c0a542 test 2022-07-14 14:34:09 -04:00
Thang Pham
35e4aad392 test 2022-07-14 14:34:08 -04:00
Thang Pham
1877ecc175 test 2022-07-14 14:34:07 -04:00
Thang Pham
69f8812960 add thread limit for test_branching_with_pgbench 2022-07-14 14:33:59 -04:00
Thang Pham
7a0297dd76 test 2022-07-14 13:09:22 -04:00
Thang Pham
936e6869bc test 2022-07-14 13:09:21 -04:00
Thang Pham
d0635939d1 test 2022-07-14 13:09:20 -04:00
Thang Pham
0f5474e305 test 2022-07-14 13:09:19 -04:00
Thang Pham
c00177d526 test 2022-07-14 13:09:17 -04:00
Thang Pham
f7ba424481 Merge branch 'main' into fix-flaky-branch-tests 2022-07-14 13:09:13 -04:00
Thang Pham
35b7f14a34 add back test_branching_with_pgbench 2022-07-14 12:32:37 -04:00
Thang Pham
2d6ec3b8b0 skip test_branching_with_pgbench 2022-07-14 10:43:44 -04:00
Thang Pham
adcda6d4e5 run multiple test_ancestor_branch 2022-07-13 17:34:06 -04:00
Thang Pham
8eef47eeb4 only run test_ancestor_branch 2022-07-13 16:21:43 -04:00
Thang Pham
9536d17b04 remove sleep failpoints 2022-07-13 15:53:19 -04:00
Thang Pham
d28fe732c5 skip branch and gc test 2022-07-13 15:33:58 -04:00
Thang Pham
5fb2e6df50 update test_branch_and_gc test:
- add manual compaction
- reduce compaction frequency
2022-07-13 15:11:19 -04:00
46 changed files with 1095 additions and 2050 deletions

View File

@@ -1,13 +0,0 @@
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
#
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
#
[profile.dev.package."*"]
# Set the default for dependencies in Development mode.
opt-level = 3
[profile.dev]
# Turn on a small amount of optimization in Development mode.
opt-level = 1

View File

@@ -31,13 +31,6 @@ inputs:
runs:
using: "composite"
steps:
- name: Checkout
if: inputs.needs_postgres_source == 'true'
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
with:
@@ -48,14 +41,15 @@ runs:
shell: bash -ex {0}
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/
rm -rf ./neon-artifact/
# Restore the parts of the 'build' directory that were included in the
# tarball. This includes the regression test modules in
# src/test/regress/*.so.
mkdir -p build/
cp -a /tmp/neon/pg_build/* build/
- name: Checkout
if: inputs.needs_postgres_source == 'true'
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry

View File

@@ -11,7 +11,7 @@ on:
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '36 4 * * *' # run once a day, timezone is utc
- cron: '36 7 * * *' # run once a day, timezone is utc
workflow_dispatch: # adds ability to run this manually
@@ -104,12 +104,3 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
run: |
REPORT_FROM=$(realpath perf-report-staging) REPORT_TO=staging scripts/generate_and_push_perf_report.sh
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -1,10 +1,9 @@
name: Test and Deploy
name: Test
on:
push:
branches:
- main
- release
pull_request:
defaults:
@@ -21,8 +20,53 @@ env:
COPT: '-Werror'
jobs:
build-postgres:
runs-on: [ self-hosted, Linux, k8s-runner ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
env:
BUILD_TYPE: ${{ matrix.build_type }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Set pg revision for caching
id: pg_ver
run: echo ::set-output name=pg_rev::$(git rev-parse HEAD:vendor/postgres)
- name: Cache postgres build
id: cache_pg
uses: actions/cache@v3
with:
path: tmp_install/
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_ver.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Build postgres
if: steps.cache_pg.outputs.cache-hit != 'true'
run: mold -run make postgres -j$(nproc)
# actions/cache@v3 does not allow concurrently using the same cache across job steps, so use a separate cache
- name: Prepare postgres artifact
run: tar -C tmp_install/ -czf ./pg.tgz .
- name: Upload postgres artifact
uses: actions/upload-artifact@v3
with:
retention-days: 7
if-no-files-found: error
name: postgres-${{ runner.os }}-${{ matrix.build_type }}-artifact
path: ./pg.tgz
build-neon:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-postgres ]
strategy:
fail-fast: false
matrix:
@@ -39,80 +83,80 @@ jobs:
submodules: true
fetch-depth: 1
- name: Set pg revision for caching
id: pg_ver
run: echo ::set-output name=pg_rev::$(git rev-parse HEAD:vendor/postgres)
# Set some environment variables used by all the steps.
#
# CARGO_FLAGS is extra options to pass to "cargo build", "cargo test" etc.
# It also includes --features, if any
#
# CARGO_FEATURES is passed to "cargo metadata". It is separate from CARGO_FLAGS,
# because "cargo metadata" doesn't accept --release or --debug options
#
- name: Set env variables
- name: Get postgres artifact for restoration
uses: actions/download-artifact@v3
with:
name: postgres-${{ runner.os }}-${{ matrix.build_type }}-artifact
path: ./postgres-artifact/
- name: Extract postgres artifact
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FEATURES=""
CARGO_FLAGS=""
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FEATURES="--features profiling"
CARGO_FLAGS="--release $CARGO_FEATURES"
fi
echo "cov_prefix=${cov_prefix}" >> $GITHUB_ENV
echo "CARGO_FEATURES=${CARGO_FEATURES}" >> $GITHUB_ENV
echo "CARGO_FLAGS=${CARGO_FLAGS}" >> $GITHUB_ENV
mkdir ./tmp_install/
tar -xf ./postgres-artifact/pg.tgz -C ./tmp_install/
rm -rf ./postgres-artifact/
# Don't include the ~/.cargo/registry/src directory. It contains just
# uncompressed versions of the crates in ~/.cargo/registry/cache
# directory, and it's faster to let 'cargo' to rebuild it from the
# compressed crates.
- name: Cache cargo deps
id: cache_cargo
uses: actions/cache@v3
with:
path: |
~/.cargo/registry/
!~/.cargo/registry/src
~/.cargo/git/
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
- name: Cache postgres build
id: cache_pg
uses: actions/cache@v3
with:
path: |
tmp_install/
build/src/test/regress/*.so
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_ver.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Build postgres
if: steps.cache_pg.outputs.cache-hit != 'true'
run: mold -run make postgres -j$(nproc)
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
- name: Run cargo build
run: |
${cov_prefix} mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS="--release --features profiling"
fi
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
- name: Run cargo test
run: |
${cov_prefix} cargo test $CARGO_FLAGS
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS=--release
fi
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
- name: Install rust binaries
run: |
# Install target binaries
mkdir -p /tmp/neon/bin/
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
binaries=$(
${cov_prefix} cargo metadata $CARGO_FEATURES --format-version=1 --no-deps |
"${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
test_exe_paths=$(
"${cov_prefix[@]}" cargo test --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
mkdir -p /tmp/neon/bin/
mkdir -p /tmp/neon/test_bin/
mkdir -p /tmp/neon/etc/
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
# Install target binaries
for bin in $binaries; do
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/neon/bin/$bin
@@ -121,39 +165,22 @@ jobs:
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
mkdir -p /tmp/neon/test_bin/
test_exe_paths=$(
${cov_prefix} cargo test $CARGO_FLAGS --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
for bin in $binaries; do
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
done
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/neon/test_bin/$(basename $bin)
# We don't need debug symbols for code coverage, so strip them out to make
# the artifact smaller.
strip "$SRC" -o "$DST"
cp "$SRC" "$DST"
echo "$DST" >> /tmp/coverage/binaries.list
done
for bin in $binaries; do
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
done
fi
- name: Install postgres binaries
run: |
cp -a tmp_install /tmp/neon/pg_install
# Include modules needed by the Postgres regression tests
mkdir -p /tmp/neon/pg_build/src/test/regress
cp -a build/src/test/regress/*.so /tmp/neon/pg_build/src/test/regress
run: cp -a tmp_install /tmp/neon/pg_install
- name: Prepare neon artifact
run: ZSTD_NBTHREADS=0 tar -C /tmp/neon/ -cf ./neon.tar.zst --zstd .
run: tar -C /tmp/neon/ -czf ./neon.tgz .
- name: Upload neon binaries
uses: actions/upload-artifact@v3
@@ -161,7 +188,7 @@ jobs:
retention-days: 7
if-no-files-found: error
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon.tar.zst
path: ./neon.tgz
# XXX: keep this after the binaries.list is formed, so the coverage can properly work later
- name: Merge and upload coverage data
@@ -272,10 +299,9 @@ jobs:
with:
path: |
~/.cargo/registry/
!~/.cargo/registry/src
~/.cargo/git/
target/
key: v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
@@ -286,7 +312,7 @@ jobs:
- name: Extract Neon artifact
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/
rm -rf ./neon-artifact/
- name: Restore coverage data
@@ -407,9 +433,9 @@ jobs:
- name: Get legacy build tag
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "::set-output name=tag::latest"
echo "::set-output name=tag::latest
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "::set-output name=tag::release"
echo "::set-output name=tag::release
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
@@ -469,9 +495,9 @@ jobs:
- name: Get legacy build tag
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "::set-output name=tag::latest"
echo "::set-output name=tag::latest
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "::set-output name=tag::release"
echo "::set-output name=tag::release
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
@@ -524,7 +550,7 @@ jobs:
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
STAGING='{"env_name": "staging", "proxy_job": "neon-proxy", "proxy_config": "staging.proxy", "kubeconfig_secret": "STAGING_KUBECONFIG_DATA"}'
NEON_STRESS='{"env_name": "neon-stress", "proxy_job": "neon-stress-proxy", "proxy_config": "neon-stress.proxy", "kubeconfig_secret": "NEON_STRESS_KUBECONFIG_DATA"}'
echo "::set-output name=include::[$STAGING]"
echo "::set-output name=include::[$STAGING, $NEON_STRESS]"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
PRODUCTION='{"env_name": "production", "proxy_job": "neon-proxy", "proxy_config": "production.proxy", "kubeconfig_secret": "PRODUCTION_KUBECONFIG_DATA"}'
echo "::set-output name=include::[$PRODUCTION]"

View File

@@ -98,10 +98,9 @@ jobs:
with:
path: |
~/.cargo/registry
!~/.cargo/registry/src
~/.cargo/git
target
key: v1-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }}
key: ${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }}
- name: Run cargo clippy
run: ./run_clippy.sh

774
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -46,28 +46,24 @@ CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
#
# Top level Makefile to build Neon and PostgreSQL
# Top level Makefile to build Zenith and PostgreSQL
#
.PHONY: all
all: neon postgres
all: zenith postgres
### Neon Rust bits
### Zenith Rust bits
#
# The 'postgres_ffi' depends on the Postgres headers.
.PHONY: neon
neon: postgres-headers
+@echo "Compiling Neon"
.PHONY: zenith
zenith: postgres-headers
+@echo "Compiling Zenith"
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS)
### PostgreSQL parts
#
# Postgres is built in the 'build' directory, and installed into
# $(POSTGRES_INSTALL_DIR), which defaults to 'tmp_install'
#
build/config.status:
$(POSTGRES_INSTALL_DIR)/build/config.status:
+@echo "Configuring postgres build"
mkdir -p build
(cd build && \
mkdir -p $(POSTGRES_INSTALL_DIR)/build
(cd $(POSTGRES_INSTALL_DIR)/build && \
$(ROOT_PROJECT_DIR)/vendor/postgres/configure CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) \
$(SECCOMP) \
@@ -75,44 +71,44 @@ build/config.status:
# nicer alias for running 'configure'
.PHONY: postgres-configure
postgres-configure: build/config.status
postgres-configure: $(POSTGRES_INSTALL_DIR)/build/config.status
# Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/include
.PHONY: postgres-headers
postgres-headers: postgres-configure
+@echo "Installing PostgreSQL headers"
$(MAKE) -C build/src/include MAKELEVEL=0 install
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/src/include MAKELEVEL=0 install
# Compile and install PostgreSQL and contrib/neon
.PHONY: postgres
postgres: postgres-configure \
postgres-headers # to prevent `make install` conflicts with neon's `postgres-headers`
postgres-headers # to prevent `make install` conflicts with zenith's `postgres-headers`
+@echo "Compiling PostgreSQL"
$(MAKE) -C build MAKELEVEL=0 install
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build MAKELEVEL=0 install
+@echo "Compiling contrib/neon"
$(MAKE) -C build/contrib/neon install
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/neon install
+@echo "Compiling contrib/neon_test_utils"
$(MAKE) -C build/contrib/neon_test_utils install
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/neon_test_utils install
+@echo "Compiling pg_buffercache"
$(MAKE) -C build/contrib/pg_buffercache install
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pg_buffercache install
+@echo "Compiling pageinspect"
$(MAKE) -C build/contrib/pageinspect install
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pageinspect install
.PHONY: postgres-clean
postgres-clean:
$(MAKE) -C build MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build MAKELEVEL=0 clean
# This doesn't remove the effects of 'configure'.
.PHONY: clean
clean:
cd build && $(MAKE) clean
cd $(POSTGRES_INSTALL_DIR)/build && $(MAKE) clean
$(CARGO_CMD_PREFIX) cargo clean
# This removes everything
.PHONY: distclean
distclean:
rm -rf build $(POSTGRES_INSTALL_DIR)
rm -rf $(POSTGRES_INSTALL_DIR)
$(CARGO_CMD_PREFIX) cargo clean
.PHONY: fmt

View File

@@ -1,7 +1,8 @@
use std::path::Path;
use anyhow::Result;
use anyhow::{anyhow, Result};
use log::{info, log_enabled, warn, Level};
use postgres::error::SqlState;
use postgres::{Client, NoTls};
use serde::Deserialize;
@@ -394,34 +395,20 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
// This will only change ownership on the schema itself, not the objects
// inside it. Without it owner of the `public` schema will be `cloud_admin`
// and database owner cannot do anything with it. SQL procedure ensures
// that it won't error out if schema `public` doesn't exist.
let alter_query = format!(
"DO $$\n\
DECLARE\n\
schema_owner TEXT;\n\
BEGIN\n\
IF EXISTS(\n\
SELECT nspname\n\
FROM pg_catalog.pg_namespace\n\
WHERE nspname = 'public'\n\
)\n\
THEN\n\
SELECT nspowner::regrole::text\n\
FROM pg_catalog.pg_namespace\n\
WHERE nspname = 'public'\n\
INTO schema_owner;\n\
\n\
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
THEN\n\
ALTER SCHEMA public OWNER TO {};\n\
END IF;\n\
END IF;\n\
END\n\
$$;",
db.owner.quote()
);
db_client.simple_query(&alter_query)?;
// and database owner cannot do anything with it.
let alter_query = format!("ALTER SCHEMA public OWNER TO {}", db.owner.quote());
let res = db_client.simple_query(&alter_query);
if let Err(e) = res {
if e.code() == Some(&SqlState::INVALID_SCHEMA_NAME) {
// This is OK, db just don't have a `public` schema.
// Probably user dropped it manually.
info!("no 'public' schema found in the database {}", db.name);
} else {
// Something different happened, propagate the error
return Err(anyhow!(e));
}
}
}
Ok(())

1
docs/.gitignore vendored
View File

@@ -1 +0,0 @@
book

14
docs/README.md Normal file
View File

@@ -0,0 +1,14 @@
# Zenith documentation
## Table of contents
- [authentication.md](authentication.md) — pageserver JWT authentication.
- [docker.md](docker.md) — Docker images and building pipeline.
- [glossary.md](glossary.md) — Glossary of all the terms used in codebase.
- [multitenancy.md](multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
- [sourcetree.md](sourcetree.md) — Overview of the source tree layout.
- [pageserver/README.md](/pageserver/README.md) — pageserver overview.
- [postgres_ffi/README.md](/libs/postgres_ffi/README.md) — Postgres FFI overview.
- [test_runner/README.md](/test_runner/README.md) — tests infrastructure overview.
- [safekeeper/README.md](/safekeeper/README.md) — WAL service overview.
- [core_changes.md](core_changes.md) - Description of Zenith changes in Postgres core

View File

@@ -1,84 +0,0 @@
# Summary
[Introduction]()
- [Separation of Compute and Storage](./separation-compute-storage.md)
# Architecture
- [Compute]()
- [WAL proposer]()
- [WAL Backpressure]()
- [Postgres changes](./core_changes.md)
- [Pageserver](./pageserver.md)
- [Services](./pageserver-services.md)
- [Thread management](./pageserver-thread-mgmt.md)
- [WAL Redo](./pageserver-walredo.md)
- [Page cache](./pageserver-pagecache.md)
- [Storage](./pageserver-storage.md)
- [Datadir mapping]()
- [Layer files]()
- [Branching]()
- [Garbage collection]()
- [Cloud Storage]()
- [Processing a GetPage request](./pageserver-processing-getpage.md)
- [Processing WAL](./pageserver-processing-wal.md)
- [Management API]()
- [Tenant Rebalancing]()
- [WAL Service](walservice.md)
- [Consensus protocol](safekeeper-protocol.md)
- [Management API]()
- [Rebalancing]()
- [Control Plane]()
- [Proxy]()
- [Source view](./sourcetree.md)
- [docker.md](./docker.md) — Docker images and building pipeline.
- [Error handling and logging]()
- [Testing]()
- [Unit testing]()
- [Integration testing]()
- [Benchmarks]()
- [Glossary](./glossary.md)
# Uncategorized
- [authentication.md](./authentication.md)
- [multitenancy.md](./multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
- [settings.md](./settings.md)
#FIXME: move these under sourcetree.md
#- [pageserver/README.md](/pageserver/README.md)
#- [postgres_ffi/README.md](/libs/postgres_ffi/README.md)
#- [test_runner/README.md](/test_runner/README.md)
#- [safekeeper/README.md](/safekeeper/README.md)
# RFCs
- [RFCs](./rfcs/README.md)
- [002-storage](rfcs/002-storage.md)
- [003-laptop-cli](rfcs/003-laptop-cli.md)
- [004-durability](rfcs/004-durability.md)
- [005-zenith_local](rfcs/005-zenith_local.md)
- [006-laptop-cli-v2-CLI](rfcs/006-laptop-cli-v2-CLI.md)
- [006-laptop-cli-v2-repository-structure](rfcs/006-laptop-cli-v2-repository-structure.md)
- [007-serverless-on-laptop](rfcs/007-serverless-on-laptop.md)
- [008-push-pull](rfcs/008-push-pull.md)
- [009-snapshot-first-storage-cli](rfcs/009-snapshot-first-storage-cli.md)
- [009-snapshot-first-storage](rfcs/009-snapshot-first-storage.md)
- [009-snapshot-first-storage-pitr](rfcs/009-snapshot-first-storage-pitr.md)
- [010-storage_details](rfcs/010-storage_details.md)
- [011-retention-policy](rfcs/011-retention-policy.md)
- [012-background-tasks](rfcs/012-background-tasks.md)
- [013-term-history](rfcs/013-term-history.md)
- [014-safekeepers-gossip](rfcs/014-safekeepers-gossip.md)
- [014-storage-lsm](rfcs/014-storage-lsm.md)
- [015-storage-messaging](rfcs/015-storage-messaging.md)
- [016-connection-routing](rfcs/016-connection-routing.md)
- [cluster-size-limits](rfcs/cluster-size-limits.md)

View File

@@ -1,5 +0,0 @@
[book]
language = "en"
multilingual = false
src = "."
title = "Neon architecture"

View File

@@ -1,519 +1,202 @@
# Postgres core changes
1. Add t_cid to XLOG record
- Why?
The cmin/cmax on a heap page is a real bummer. I don't see any other way to fix that than bite the bullet and modify the WAL-logging routine to include the cmin/cmax.
This lists all the changes that have been made to the PostgreSQL
source tree, as a somewhat logical set of patches. The long-term goal
is to eliminate all these changes, by submitting patches to upstream
and refactoring code into extensions, so that you can run unmodified
PostgreSQL against Neon storage.
To recap, the problem is that the XLOG_HEAP_INSERT record does not include the command id of the inserted row. And same with deletion/update. So in the primary, a row is inserted with current xmin + cmin. But in the replica, the cmin is always set to 1. That works, because the command id is only relevant to the inserting transaction itself. After commit/abort, no one cares abut it anymore.
In Neon, we run PostgreSQL in the compute nodes, but we also run a special WAL redo process in the
page server. We currently use the same binary for both, with --wal-redo runtime flag to launch it in
the WAL redo mode. Some PostgreSQL changes are needed in the compute node, while others are just for
the WAL redo process.
- Alternatives?
I don't know
In addition to core PostgreSQL changes, there is a Neon extension in contrib/neon, to hook into the
smgr interface. Once all the core changes have been submitted to upstream or eliminated some other
way, the extension could live outside the postgres repository and build against vanilla PostgreSQL.
2. Add PD_WAL_LOGGED.
- Why?
Postgres sometimes writes data to the page before it is wal-logged. If such page ais swapped out, we will loose this change. The problem is currently solved by setting PD_WAL_LOGGED bit in page header. When page without this bit set is written to the SMGR, then it is forced to be written to the WAL as FPI using log_newpage_copy() function.
Below is a list of all the PostgreSQL source code changes, categorized into changes needed for
compute, and changes needed for the WAL redo process:
There was wrong assumption that it can happen only during construction of some exotic indexes (like gist). It is not true. The same situation can happen with COPY,VACUUM and when record hint bits are set.
# Changes for Compute node
- Discussion:
https://discord.com/channels/869525774699462656/882681420986851359
## Add t_cid to heap WAL records
- Alternatives:
Do not store this flag in page header, but associate this bit with shared buffer. Logically it is more correct but in practice we will get not advantages: neither in space, neither in CPU overhead.
```
src/backend/access/heap/heapam.c | 26 +-
src/include/access/heapam_xlog.h | 6 +-
```
We have added a new t_cid field to heap WAL records. This changes the WAL record format, making Neon WAL format incompatible with vanilla PostgreSQL!
3. XLogReadBufferForRedo not always loads and pins requested buffer. So we need to add extra checks that buffer is really pinned. Also do not use BufferGetBlockNumber for buffer returned by XLogReadBufferForRedo.
- Why?
XLogReadBufferForRedo is not pinning pages which are not requested by wal-redo. It is specific only for wal-redo Postgres.
### Problem we're trying to solve
- Alternatives?
No
The problem is that the XLOG_HEAP_INSERT record does not include the command id of the inserted row. And same with deletion/update. So in the primary, a row is inserted with current xmin + cmin. But in the replica, the cmin is always set to 1. That works in PostgreSQL, because the command id is only relevant to the inserting transaction itself. After commit/abort, no one cares about it anymore. But with Neon, we rely on WAL replay to reconstruct the page, even while the original transaction is still running.
### How to get rid of the patch
4. Eliminate reporting of some warnings related with hint bits, for example
"page is not marked all-visible but visibility map bit is set in relation".
- Why?
Hint bit may be not WAL logged.
Bite the bullet and submit the patch to PostgreSQL, to add the t_cid to the WAL records. It makes the WAL records larger, which could make this unpopular in the PostgreSQL community. However, it might simplify some logical decoding code; Andres Freund briefly mentioned in PGCon 2022 discussion on Heikki's Neon presentation that logical decoding currently needs to jump through some hoops to reconstruct the same information.
- Alternative?
Always wal log any page changes.
### Alternatives
Perhaps we could write an extra WAL record with the t_cid information, when a page is evicted that contains rows that were touched a transaction that's still running. However, that seems very complicated.
5. Maintain last written LSN.
- Why?
When compute node requests page from page server, we need to specify LSN. Ideally it should be LSN
of WAL record performing last update of this pages. But we do not know it, because we do not have page.
We can use current WAL flush position, but in this case there is high probability that page server
will be blocked until this peace of WAL is delivered.
As better approximation we can keep max LSN of written page. It will be better to take in account LSNs only of evicted pages,
but SMGR API doesn't provide such knowledge.
## ginfast.c
- Alternatives?
Maintain map of LSNs of evicted pages.
```
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
index e0d9940946..2d964c02e9 100644
--- a/src/backend/access/gin/ginfast.c
+++ b/src/backend/access/gin/ginfast.c
@@ -285,6 +285,17 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
memset(&sublist, 0, sizeof(GinMetaPageData));
makeSublist(index, collector->tuples, collector->ntuples, &sublist);
+ if (metadata->head != InvalidBlockNumber)
+ {
+ /*
+ * ZENITH: Get buffer before XLogBeginInsert() to avoid recursive call
+ * of XLogBeginInsert(). Reading a new buffer might evict a dirty page from
+ * the buffer cache, and if that page happens to be an FSM or VM page, zenith_write()
+ * will try to WAL-log an image of the page.
+ */
+ buffer = ReadBuffer(index, metadata->tail);
+ }
+
if (needWal)
XLogBeginInsert();
@@ -316,7 +327,6 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
data.prevTail = metadata->tail;
data.newRightlink = sublist.head;
- buffer = ReadBuffer(index, metadata->tail);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
```
The problem is explained in the comment above
6. Launching Postgres without WAL.
- Why?
According to Zenith architecture compute node is stateless. So when we are launching
compute node, we need to provide some dummy PG_DATADIR. Relation pages
can be requested on demand from page server. But Postgres still need some non-relational data:
control and configuration files, SLRUs,...
It is currently implemented using basebackup (do not mix with pg_basebackup) which is created
by pageserver. It includes in this tarball config/control files, SLRUs and required directories.
As far as pageserver do not have original (non-scattered) WAL segments, it includes in
this tarball dummy WAL segment which contains only SHUTDOWN_CHECKPOINT record at the beginning of segment,
which redo field points to the end of wal. It allows to load checkpoint record in more or less
standard way with minimal changes of Postgres, but then some special handling is needed,
including restoring previous record position from zenith.signal file.
Also we have to correctly initialize header of last WAL page (pointed by checkpoint.redo)
to pass checks performed by XLogReader.
### How to get rid of the patch
- Alternatives?
We may not include fake WAL segment in tarball at all and modify xlog.c to load checkpoint record
in special way. But it may only increase number of changes in xlog.c
Can we stop WAL-logging FSM or VM pages? Or delay the WAL logging until we're out of the critical
section or something.
7. Add redo_read_buffer_filter callback to XLogReadBufferForRedoExtended
- Why?
We need a way in wal-redo Postgres to ignore pages which are not requested by pageserver.
So wal-redo Postgres reconstructs only requested page and for all other returns BLK_DONE
which means that recovery for them is not needed.
Maybe some bigger rewrite of FSM and VM would help to avoid WAL-logging FSM and VM page images?
- Alternatives?
No
8. Enforce WAL logging of sequence updates.
- Why?
Due to performance reasons Postgres don't want to log each fetching of a value from a sequence,
so we pre-log a few fetches in advance. In the event of crash we can lose
(skip over) as many values as we pre-logged.
But it doesn't work with Zenith because page with sequence value can be evicted from buffer cache
and we will get a gap in sequence values even without crash.
## Mark index builds that use buffer manager without logging explicitly
- Alternatives:
Do not try to preserve sequential order but avoid performance penalty.
```
src/backend/access/gin/gininsert.c | 7 +
src/backend/access/gist/gistbuild.c | 15 +-
src/backend/access/spgist/spginsert.c | 8 +-
also some changes in src/backend/storage/smgr/smgr.c
```
9. Treat unlogged tables as normal (permanent) tables.
- Why?
Unlogged tables are not transient, so them have to survive node restart (unlike temporary tables).
But as far as compute node is stateless, we need to persist their data to storage node.
And it can only be done through the WAL.
When a GIN index is built, for example, it is built by inserting the entries into the index more or
less normally, but without WAL-logging anything. After the index has been built, we iterate through
all pages and write them to the WAL. That doesn't work for Neon, because if a page is not WAL-logged
and is evicted from the buffer cache, it is lost. We have an check to catch that in the Neon
extension. To fix that, we've added a few functions to track explicitly when we're performing such
an operation: `smgr_start_unlogged_build`, `smgr_finish_unlogged_build_phase_1` and
`smgr_end_unlogged_build`.
### How to get rid of the patch
I think it would make sense to be more explicit about that in PostgreSQL too. So extract these
changes to a patch and post to pgsql-hackers.
- Alternatives?
* Store unlogged tables locally (violates requirement of stateless compute nodes).
* Prohibit unlogged tables at all.
## Track last-written page LSN
10. Support start Postgres in wal-redo mode
- Why?
To be able to apply WAL record and reconstruct pages at page server.
```
src/backend/commands/dbcommands.c | 17 +-
- Alternatives?
* Rewrite redo handlers in Rust
* Do not reconstruct pages at page server at all and do it at compute node.
Also one call to SetLastWrittenPageLSN() in spginsert.c, maybe elsewhere too
```
Whenever a page is evicted from the buffer cache, we remember its LSN, so that we can use the same
LSN in the GetPage@LSN request when reading the page back from the page server. The value is
conservative: it would be correct to always use the last-inserted LSN, but it would be slow because
then the page server would need to wait for the recent WAL to be streamed and processed, before
responding to any GetPage@LSN request.
11. WAL proposer
- Why?
WAL proposer is communicating with safekeeper and ensures WAL durability by quorum writes.
It is currently implemented as patch to standard WAL sender.
The last-written page LSN is mostly tracked in the smgrwrite() function, without core code changes,
but there are a few exceptions where we've had to add explicit calls to the Neon-specific
SetLastWrittenPageLSN() function.
- Alternatives?
Can be moved to extension if some extra callbacks will be added to wal sender code.
There's an open PR to track the LSN in a more-fine grained fashion:
https://github.com/neondatabase/postgres/pull/177
PostgreSQL v15 introduces a new method to do CREATE DATABASE that WAL-logs the database instead of
relying copying files and checkpoint. With that method, we probably won't need any special handling.
The old method is still available, though.
12. Secure Computing BPF API wrapper.
- Why?
Pageserver delegates complex WAL decoding duties to Postgres,
which means that the latter might fall victim to carefully designed
malicious WAL records and start doing harmful things to the system.
To prevent this, it has been decided to limit possible interactions
with the outside world using the Secure Computing BPF mode.
- Alternatives:
* Rewrite redo handlers in Rust.
* Add more checks to guarantee correctness of WAL records.
* Move seccomp.c to extension
* Many other discussed approaches to neutralize incorrect WAL records vulnerabilities.
13. Callbacks for replica feedbacks
- Why?
Allowing waproposer to interact with walsender code.
- Alternatives
Copy walsender code to walproposer.
14. Support multiple SMGR implementations.
- Why?
Postgres provides abstract API for storage manager but it has only one implementation
and provides no way to replace it with custom storage manager.
- Alternatives?
None.
15. Calculate database size as sum of all database relations.
- Why?
Postgres is calculating database size by traversing data directory
but as far as Zenith compute node is stateless we can not do it.
- Alternatives?
Send this request directly to pageserver and calculate real (physical) size
of Zenith representation of database/timeline, rather than sum logical size of all relations.
### How to get rid of the patch
Wait until v15?
-----------------------------------------------
Not currently committed but proposed:
1. Disable ring buffer buffer manager strategies
- Why?
Postgres tries to avoid cache flushing by bulk operations (copy, seqscan, vacuum,...).
Even if there are free space in buffer cache, pages may be evicted.
Negative effect of it can be somehow compensated by file system cache, but in case of Zenith
cost of requesting page from page server is much higher.
## Cache relation sizes
- Alternatives?
Instead of just prohibiting ring buffer we may try to implement more flexible eviction policy,
for example copy evicted page from ring buffer to some other buffer if there is free space
in buffer cache.
The Neon extension contains a little cache for smgrnblocks() and smgrexists() calls, to avoid going
to the page server every time. It might be useful to cache those in PostgreSQL, maybe in the
relcache? (I think we do cache nblocks in relcache already, check why that's not good enough for
Neon)
2. Disable marking page as dirty when hint bits are set.
- Why?
Postgres has to modify page twice: first time when some tuple is updated and second time when
hint bits are set. Wal logging hint bits updates requires FPI which significantly increase size of WAL.
- Alternatives?
Add special WAL record for setting page hints.
## Misc change in vacuumlazy.c
3. Prefetching
- Why?
As far as pages in Zenith are loaded on demand, to reduce node startup time
and also speedup some massive queries we need some mechanism for bulk loading to
reduce page request round-trip overhead.
```
index 8aab6e324e..c684c4fbee 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1487,7 +1487,10 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
else if (all_visible_according_to_vm && !PageIsAllVisible(page)
&& VM_ALL_VISIBLE(vacrel->rel, blkno, &vmbuffer))
{
- elog(WARNING, "page is not marked all-visible but visibility map bit is set in relation \"%s\" page %u",
+ /* ZENITH-XXX: all visible hint is not wal-logged
+ * FIXME: Replay visibilitymap changes in pageserver
+ */
+ elog(DEBUG1, "page is not marked all-visible but visibility map bit is set in relation \"%s\" page %u",
vacrel->relname, blkno);
visibilitymap_clear(vacrel->rel, blkno, vmbuffer,
VISIBILITYMAP_VALID_BITS);
```
Currently Postgres is supporting prefetching only for bitmap scan.
In Zenith we also use prefetch for sequential and index scan. For sequential scan we prefetch
some number of following pages. For index scan we prefetch pages of heap relation addressed by TIDs.
Is this still needed? If that WARNING happens, it looks like potential corruption that we should
fix!
## Use buffer manager when extending VM or FSM
```
src/backend/storage/freespace/freespace.c | 14 +-
src/backend/access/heap/visibilitymap.c | 15 +-
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index e198df65d8..addfe93eac 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -652,10 +652,19 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
/* Now extend the file */
while (vm_nblocks_now < vm_nblocks)
{
- PageSetChecksumInplace((Page) pg.data, vm_nblocks_now);
+ /*
+ * ZENITH: Initialize VM pages through buffer cache to prevent loading
+ * them from pageserver.
+ */
+ Buffer buffer = ReadBufferExtended(rel, VISIBILITYMAP_FORKNUM, P_NEW,
+ RBM_ZERO_AND_LOCK, NULL);
+ Page page = BufferGetPage(buffer);
+
+ PageInit((Page) page, BLCKSZ, 0);
+ PageSetChecksumInplace(page, vm_nblocks_now);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
- smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
- pg.data, false);
vm_nblocks_now++;
}
```
### Problem we're trying to solve
???
### How to get rid of the patch
Maybe this would be a reasonable change in PostgreSQL too?
## Allow startup without reading checkpoint record
In Neon, the compute node is stateless. So when we are launching compute node, we need to provide
some dummy PG_DATADIR. Relation pages can be requested on demand from page server. But Postgres
still need some non-relational data: control and configuration files, SLRUs,... It is currently
implemented using basebackup (do not mix with pg_basebackup) which is created by pageserver. It
includes in this tarball config/control files, SLRUs and required directories.
As pageserver does not have the original WAL segments, the basebackup tarball includes an empty WAL
segment to bootstrap the WAL writing, but it doesn't contain the checkpoint record. There are some
changes in xlog.c, to allow starting the compute node without reading the last checkpoint record
from WAL.
This includes code to read the `zenith.signal` file, which tells the startup code the LSN to start
at. When the `zenith.signal` file is present, the startup uses that LSN instead of the last
checkpoint's LSN. The system is known to be consistent at that LSN, without any WAL redo.
### How to get rid of the patch
???
### Alternatives
Include a fake checkpoint record in the tarball. Creating fake WAL is a bit risky, though; I'm
afraid it might accidentally get streamed to the safekeepers and overwrite or corrupt the real WAL.
## Disable sequence caching
```
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 0415df9ccb..9f9db3c8bc 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -53,7 +53,9 @@
* so we pre-log a few fetches in advance. In the event of
* crash we can lose (skip over) as many values as we pre-logged.
*/
-#define SEQ_LOG_VALS 32
+/* Zenith XXX: to ensure sequence order of sequence in Zenith we need to WAL log each sequence update. */
+/* #define SEQ_LOG_VALS 32 */
+#define SEQ_LOG_VALS 0
```
Due to performance reasons Postgres don't want to log each fetching of a value from a sequence, so
it pre-logs a few fetches in advance. In the event of crash we can lose (skip over) as many values
as we pre-logged. But with Neon, because page with sequence value can be evicted from buffer cache,
we can get a gap in sequence values even without crash.
### How to get rid of the patch
Maybe we can just remove it, and accept the gaps. Or add some special handling for sequence
relations in the Neon extension, to WAL log the sequence page when it's about to be evicted. It
would be weird if the sequence moved backwards though, think of PITR.
Or add a GUC for the amount to prefix to PostgreSQL, and force it to 1 in Neon.
## Walproposer
```
src/Makefile | 1 +
src/backend/replication/libpqwalproposer/Makefile | 37 +
src/backend/replication/libpqwalproposer/libpqwalproposer.c | 416 ++++++++++++
src/backend/postmaster/bgworker.c | 4 +
src/backend/postmaster/postmaster.c | 6 +
src/backend/replication/Makefile | 4 +-
src/backend/replication/walproposer.c | 2350 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/backend/replication/walproposer_utils.c | 402 +++++++++++
src/backend/replication/walreceiver.c | 7 +
src/backend/replication/walsender.c | 320 ++++++---
src/backend/storage/ipc/ipci.c | 6 +
src/include/replication/walproposer.h | 565 ++++++++++++++++
```
WAL proposer is communicating with safekeeper and ensures WAL durability by quorum writes. It is
currently implemented as patch to standard WAL sender.
### How to get rid of the patch
Refactor into an extension. Submit hooks or APIs into upstream if necessary.
@MMeent did some work on this already: https://github.com/neondatabase/postgres/pull/96
## Ignore unexpected data beyond EOF in bufmgr.c
```
@@ -922,11 +928,14 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((Page) bufBlock))
- ereport(ERROR,
+ {
+ // XXX-ZENITH
+ MemSet((char *) bufBlock, 0, BLCKSZ);
+ ereport(DEBUG1,
(errmsg("unexpected data beyond EOF in block %u of relation %s",
blockNum, relpath(smgr->smgr_rnode, forkNum)),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
-
+ }
/*
* We *must* do smgrextend before succeeding, else the page will not
* be reserved by the kernel, and the next P_NEW call will decide to
```
PostgreSQL is a bit sloppy with extending relations. Usually, the relation is extended with zeros
first, then the page is filled, and finally the new page WAL-logged. But if multiple backends extend
a relation at the same time, the pages can be WAL-logged in different order.
I'm not sure what scenario exactly required this change in Neon, though.
### How to get rid of the patch
Submit patches to pgsql-hackers, to tighten up the WAL-logging around relation extension. It's a bit
confusing even in PostgreSQL. Maybe WAL log the intention to extend first, then extend the relation,
and finally WAL-log that the extension succeeded.
## Make smgr interface available to extensions
```
src/backend/storage/smgr/smgr.c | 203 +++---
src/include/storage/smgr.h | 72 +-
```
### How to get rid of the patch
Submit to upstream. This could be useful for the Disk Encryption patches too, or for compression.
## Added relpersistence argument to smgropen()
```
src/backend/access/heap/heapam_handler.c | 2 +-
src/backend/catalog/storage.c | 10 +-
src/backend/commands/tablecmds.c | 2 +-
src/backend/storage/smgr/md.c | 4 +-
src/include/utils/rel.h | 3 +-
```
Neon needs to treat unlogged relations differently from others, so the smgrread(), smgrwrite() etc.
implementations need to know the 'relpersistence' of the relation. To get that information where
it's needed, we added the 'relpersistence' field to smgropen().
### How to get rid of the patch
Maybe 'relpersistence' would be useful in PostgreSQL for debugging purposes? Or simply for the
benefit of extensions like Neon. Should consider this in the patch to make smgr API usable to
extensions.
## Alternatives
Currently in Neon, unlogged tables live on local disk in the compute node, and are wiped away on
compute node restart. One alternative would be to instead WAL-log even unlogged tables, essentially
ignoring the UNLOGGED option. Or prohibit UNLOGGED tables completely. But would we still need the
relpersistence argument to handle index builds? See item on "Mark index builds that use buffer
manager without logging explicitly".
## Use smgr and dbsize_hook for size calculations
```
src/backend/utils/adt/dbsize.c | 61 +-
```
In PostgreSQL, the rel and db-size functions scan the data directory directly. That won't work in Neon.
### How to get rid of the patch
Send patch to PostgreSQL, to use smgr API functions for relation size calculation instead. Maybe as
part of the general smgr API patch.
# WAL redo process changes
Pageserver delegates complex WAL decoding duties to Postgres, which means that the latter might fall
victim to carefully designed malicious WAL records and start doing harmful things to the system. To
prevent this, the redo functions are executed in a separate process that is sandboxed with Linux
Secure Computing mode (see seccomp(2) man page).
As an alternative to having a separate WAL redo process, we could rewrite all redo handlers in Rust
This is infeasible. However, it would take a lot of effort to rewrite them, ensure that you've done
the rewrite correctly, and once you've done that, it would be a lot of ongoing maintenance effort to
keep the rewritten code in sync over time, across new PostgreSQL versions. That's why we want to
leverage PostgreSQL code.
Another alternative would be to harden all the PostgreSQL WAL redo functions so that it would be
safe to call them directly from Rust code, without needing the security sandbox. That's not feasible
for similar reasons as rewriting them in Rust.
## Don't replay change in XLogReadBufferForRedo that are not for the target page we're replaying
```
src/backend/access/gin/ginxlog.c | 19 +-
Also some changes in xlog.c and xlogutils.c
Example:
@@ -415,21 +416,27 @@ ginRedoSplit(XLogReaderState *record)
if (!isLeaf)
ginRedoClearIncompleteSplit(record, 3);
- if (XLogReadBufferForRedo(record, 0, &lbuffer) != BLK_RESTORED)
+ action = XLogReadBufferForRedo(record, 0, &lbuffer);
+ if (action != BLK_RESTORED && action != BLK_DONE)
elog(ERROR, "GIN split record did not contain a full-page image of left page");
```
### Problem we're trying to solve
In PostgreSQL, if a WAL redo function calls XLogReadBufferForRead() for a page that has a full-page
image, it always succeeds. However, Neon WAL redo process is only concerned about replaying changes
to a singe page, so replaying any changes for other pages is a waste of cycles. We have modified
XLogReadBufferForRead() to return BLK_DONE for all other pages, to avoid the overhead. That is
unexpected by code like the above.
### How to get rid of the patch
Submit the changes to upstream, hope the community accepts them. There's no harm to PostgreSQL from
these changes, although it doesn't have any benefit either.
To make these changes useful to upstream PostgreSQL, we could implement a feature to look ahead the
WAL, and detect truncated relations. Even in PostgreSQL, it is a waste of cycles to replay changes
to pages that are later truncated away, so we could have XLogReadBufferForRedo() return BLK_DONE or
BLK_NOTFOUND for pages that are known to be truncated away later in the WAL stream.
### Alternatives
Maybe we could revert this optimization, and restore pages other than the target page too.
## Add predefined_sysidentifier flag to initdb
```
src/backend/bootstrap/bootstrap.c | 13 +-
src/bin/initdb/initdb.c | 4 +
And some changes in xlog.c
```
This is used to help with restoring a database when you have all the WAL, all the way back to
initdb, but no backup. You can reconstruct the missing backup by running initdb again, with the same
sysidentifier.
### How to get rid of the patch
Ignore it. This is only needed for disaster recovery, so once we've eliminated all other Postgres
patches, we can just keep it around as a patch or as separate branch in a repo.
# Not currently committed but proposed
## Disable ring buffer buffer manager strategies
### Why?
Postgres tries to avoid cache flushing by bulk operations (copy, seqscan, vacuum,...).
Even if there are free space in buffer cache, pages may be evicted.
Negative effect of it can be somehow compensated by file system cache, but in Neon,
cost of requesting page from page server is much higher.
### Alternatives?
Instead of just prohibiting ring buffer we may try to implement more flexible eviction policy,
for example copy evicted page from ring buffer to some other buffer if there is free space
in buffer cache.
## Disable marking page as dirty when hint bits are set.
### Why?
Postgres has to modify page twice: first time when some tuple is updated and second time when
hint bits are set. Wal logging hint bits updates requires FPI which significantly increase size of WAL.
### Alternatives?
Add special WAL record for setting page hints.
## Prefetching
### Why?
As far as pages in Neon are loaded on demand, to reduce node startup time
and also speedup some massive queries we need some mechanism for bulk loading to
reduce page request round-trip overhead.
Currently Postgres is supporting prefetching only for bitmap scan.
In Neon we should also use prefetch for sequential and index scans, because the OS is not doing it for us.
For sequential scan we could prefetch some number of following pages. For index scan we could prefetch pages
of heap relation addressed by TIDs.
## Prewarming
### Why?
Short downtime (or, in other words, fast compute node restart time) is one of the key feature of Zenith.
But overhead of request-response round-trip for loading pages on demand can make started node warm-up quite slow.
We can capture state of compute node buffer cache and send bulk request for this pages at startup.
4. Prewarming.
- Why?
Short downtime (or, in other words, fast compute node restart time) is one of the key feature of Zenith.
But overhead of request-response round-trip for loading pages on demand can make started node warm-up quite slow.
We can capture state of compute node buffer cache and send bulk request for this pages at startup.

View File

@@ -1,9 +0,0 @@
# Page Service
The Page Service listens for GetPage@LSN requests from the Compute Nodes,
and responds with pages from the repository. On each GetPage@LSN request,
it calls into the Repository function
A separate thread is spawned for each incoming connection to the page
service. The page service uses the libpq protocol to communicate with
the client. The client is a Compute Postgres instance.

View File

@@ -1,8 +0,0 @@
# Page cache
TODO:
- shared across tenants
- store pages from layer files
- store pages from "in-memory layer"
- store materialized pages

View File

@@ -1,4 +0,0 @@
# Processing a GetPage request
TODO:
- sequence diagram that shows how a GetPage@LSN request is processed

View File

@@ -1,5 +0,0 @@
# Processing WAL
TODO:
- diagram that shows how incoming WAL is processed
- explain durability, what is fsync'd when, disk_consistent_lsn

View File

@@ -1,26 +0,0 @@
## Thread management
Each thread in the system is tracked by the `thread_mgr` module. It
maintains a registry of threads, and which tenant or timeline they are
operating on. This is used for safe shutdown of a tenant, or the whole
system.
### Handling shutdown
When a tenant or timeline is deleted, we need to shut down all threads
operating on it, before deleting the data on disk. A thread registered
in the thread registry can check if it has been requested to shut down,
by calling `is_shutdown_requested()`. For async operations, there's also
a `shudown_watcher()` async task that can be used to wake up on shutdown.
### Sync vs async
The primary programming model in the page server is synchronous,
blocking code. However, there are some places where async code is
used. Be very careful when mixing sync and async code.
Async is primarily used to wait for incoming data on network
connections. For example, all WAL receivers have a shared thread pool,
with one async Task for each connection. Once a piece of WAL has been
received from the network, the thread calls the blocking functions in
the Repository to process the WAL.

View File

@@ -1,77 +0,0 @@
# WAL Redo
To reconstruct a particular page version from an image of the page and
some WAL records, the pageserver needs to replay the WAL records. This
happens on-demand, when a GetPage@LSN request comes in, or as part of
background jobs that reorganize data for faster access.
It's important that data cannot leak from one tenant to another, and
that a corrupt WAL record on one timeline doesn't affect other tenants
or timelines.
## Multi-tenant security
If you have direct access to the WAL directory, or if you have
superuser access to a running PostgreSQL server, it's easy to
construct a malicious or corrupt WAL record that causes the WAL redo
functions to crash, or to execute arbitrary code. That is not a
security problem for PostgreSQL; if you have superuser access, you
have full access to the system anyway.
The Neon pageserver, however, is multi-tenant. It needs to execute WAL
belonging to different tenants in the same system, and malicious WAL
in one tenant must not affect other tenants.
A separate WAL redo process is launched for each tenant, and the
process uses the seccomp(2) system call to restrict its access to the
bare minimum needed to replay WAL records. The process does not have
access to the filesystem or network. It can only communicate with the
parent pageserver process through a pipe.
If an attacker creates a malicious WAL record and injects it into the
WAL stream of a timeline, he can take control of the WAL redo process
in the pageserver. However, the WAL redo process cannot access the
rest of the system. And because there is a separate WAL redo process
for each tenant, the hijacked WAL redo process can only see WAL and
data belonging to the same tenant, which the attacker would have
access to anyway.
## WAL-redo process communication
The WAL redo process runs the 'postgres' executable, launched with a
Neon-specific command-line option to put it into WAL-redo process
mode. The pageserver controls the lifetime of the WAL redo processes,
launching them as needed. If a tenant is detached from the pageserver,
any WAL redo processes for that tenant are killed.
The pageserver communicates with each WAL redo process over its
stdin/stdout/stderr. It works in request-response model with a simple
custom protocol, described in walredo.rs. To replay a set of WAL
records for a page, the pageserver sends the "before" image of the
page and the WAL records over 'stdin', followed by a command to
perform the replay. The WAL redo process responds with an "after"
image of the page.
## Special handling of some records
Some WAL record types are handled directly in the pageserver, by
bespoken Rust code, and are not sent over to the WAL redo process.
This includes SLRU-related WAL records, like commit records. SLRUs
don't use the standard Postgres buffer manager, so dealing with them
in the Neon WAL redo mode would require quite a few changes to
Postgres code and special handling in the protocol anyway.
Some record types that include a full-page-image (e.g. XLOG_FPI) are
also handled specially when incoming WAL is processed already, and are
stored as page images rather than WAL records.
## Records that modify multiple pages
Some Postgres WAL records modify multiple pages. Such WAL records are
duplicated, so that a copy is stored for each affected page. This is
somewhat wasteful, but because most WAL records only affect one page,
the overhead is acceptable.
The WAL redo always happens for one particular page. If the WAL record
coantains changes to other pages, they are ignored.

View File

@@ -1,11 +0,0 @@
# Page server architecture
The Page Server has a few different duties:
- Respond to GetPage@LSN requests from the Compute Nodes
- Receive WAL from WAL safekeeper, and store it
- Upload data to S3 to make it durable, download files from S3 as needed
S3 is the main fault-tolerant storage of all data, as there are no Page Server
replicas. We use a separate fault-tolerant WAL service to reduce latency. It
keeps track of WAL records which are not synced to S3 yet.

View File

@@ -1,8 +0,0 @@
# Separation of Compute and Storage
TODO:
- Read path
- Write path
- Durability model
- API auth

View File

@@ -49,12 +49,12 @@ fn main() {
// Finding the location of C headers for the Postgres server:
// - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `<project_root>/tmp_install`
// - if there's a `bin/pg_config` file use it for getting include server, otherwise use `<project_root>/tmp_install/include/postgresql/server`
let mut pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR")
{
postgres_install_dir.into()
let mut pg_install_dir: PathBuf;
if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") {
pg_install_dir = postgres_install_dir.into();
} else {
PathBuf::from("tmp_install")
};
pg_install_dir = PathBuf::from("tmp_install")
}
if pg_install_dir.is_relative() {
let cwd = env::current_dir().unwrap();

View File

@@ -15,7 +15,6 @@ use crate::XLogPageHeaderData;
use crate::XLogRecord;
use crate::XLOG_PAGE_MAGIC;
use crate::pg_constants::WAL_SEGMENT_SIZE;
use anyhow::{bail, ensure};
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
@@ -462,7 +461,8 @@ pub fn find_end_of_wal(
pub fn main() {
let mut data_dir = PathBuf::new();
data_dir.push(".");
let (wal_end, tli) = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, true, Lsn(0)).unwrap();
let wal_seg_size = 16 * 1024 * 1024;
let (wal_end, tli) = find_end_of_wal(&data_dir, wal_seg_size, true, Lsn(0)).unwrap();
println!(
"wal_end={:>08X}{:>08X}, tli={}",
(wal_end >> 32) as u32,
@@ -606,9 +606,10 @@ mod tests {
fn test_end_of_wal<C: wal_craft::Crafter>(
test_name: &str,
expected_end_of_wal_non_partial: Lsn,
last_segment: &str,
) {
use wal_craft::*;
// Craft some WAL
// 1. Generate some WAL
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("..")
.join("..");
@@ -621,71 +622,24 @@ mod tests {
}
cfg.initdb().unwrap();
let srv = cfg.start_server().unwrap();
let (intermediate_lsns, expected_end_of_wal_partial) =
C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
let intermediate_lsns: Vec<Lsn> = intermediate_lsns
.iter()
.map(|&lsn| u64::from(lsn).into())
.collect();
let expected_end_of_wal_partial: Lsn = u64::from(expected_end_of_wal_partial).into();
let expected_wal_end: Lsn =
u64::from(C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap()).into();
srv.kill();
// Check find_end_of_wal on the initial WAL
let last_segment = cfg
.wal_dir()
.read_dir()
.unwrap()
.map(|f| f.unwrap().file_name().into_string().unwrap())
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal_partial);
for start_lsn in std::iter::once(Lsn(0))
.chain(intermediate_lsns)
.chain(std::iter::once(expected_end_of_wal_partial))
{
// Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
// We assume that `start_lsn` is non-decreasing.
info!(
"Checking with start_lsn={}, erasing WAL before it",
start_lsn
);
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
let fname = file.file_name().into_string().unwrap();
if !IsXLogFileName(&fname) {
continue;
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(start_lsn) {
continue;
}
let mut f = File::options().write(true).open(file.path()).unwrap();
const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
f.write_all(
&ZEROS[0..min(
WAL_SEGMENT_SIZE,
(u64::from(start_lsn) - seg_start_lsn) as usize,
)],
)
.unwrap();
}
check_end_of_wal(
&cfg,
&last_segment,
start_lsn,
expected_end_of_wal_non_partial,
expected_end_of_wal_partial,
);
}
}
// 2. Pick WAL generated by initdb
let wal_dir = cfg.datadir.join("pg_wal");
let wal_seg_size = 16 * 1024 * 1024;
fn check_pg_waldump_end_of_wal(
cfg: &wal_craft::Conf,
last_segment: &str,
expected_end_of_wal: Lsn,
) {
// Get the actual end of WAL by pg_waldump
// 3. Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
let wal_end = Lsn(wal_end);
info!(
"find_end_of_wal returned (wal_end={}, tli={})",
wal_end, tli
);
assert_eq!(wal_end, expected_end_of_wal_non_partial);
// 4. Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump("000000010000000000000001", last_segment)
.unwrap()
@@ -704,57 +658,32 @@ mod tests {
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
info!(
"waldump erred on {}, expected wal end at {}",
waldump_wal_end, expected_end_of_wal
waldump_wal_end, expected_wal_end
);
assert_eq!(waldump_wal_end, expected_end_of_wal);
}
assert_eq!(waldump_wal_end, expected_wal_end);
fn check_end_of_wal(
cfg: &wal_craft::Conf,
last_segment: &str,
start_lsn: Lsn,
expected_end_of_wal_non_partial: Lsn,
expected_end_of_wal_partial: Lsn,
) {
// Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
let (wal_end, tli) =
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
let wal_end = Lsn(wal_end);
info!(
"find_end_of_wal returned (wal_end={}, tli={}) with non-partial WAL segment",
wal_end, tli
);
assert_eq!(wal_end, expected_end_of_wal_non_partial);
// Rename file to partial to actually find last valid lsn, then rename it back.
// 5. Rename file to partial to actually find last valid lsn
fs::rename(
cfg.wal_dir().join(&last_segment),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
wal_dir.join(last_segment),
wal_dir.join(format!("{}.partial", last_segment)),
)
.unwrap();
let (wal_end, tli) =
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
let wal_end = Lsn(wal_end);
info!(
"find_end_of_wal returned (wal_end={}, tli={}) with partial WAL segment",
"find_end_of_wal returned (wal_end={}, tli={})",
wal_end, tli
);
assert_eq!(wal_end, expected_end_of_wal_partial);
fs::rename(
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir().join(last_segment),
)
.unwrap();
assert_eq!(wal_end, waldump_wal_end);
}
const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
#[test]
pub fn test_find_end_of_wal_simple() {
init_logging();
test_end_of_wal::<wal_craft::Simple>(
"test_find_end_of_wal_simple",
"0/2000000".parse::<Lsn>().unwrap(),
"000000010000000000000001",
);
}
@@ -764,6 +693,7 @@ mod tests {
test_end_of_wal::<wal_craft::WalRecordCrossingSegmentFollowedBySmallOne>(
"test_find_end_of_wal_crossing_segment_followed_by_small_one",
"0/3000000".parse::<Lsn>().unwrap(),
"000000010000000000000002",
);
}
@@ -774,6 +704,7 @@ mod tests {
test_end_of_wal::<wal_craft::LastWalRecordCrossingSegment>(
"test_find_end_of_wal_last_crossing_segment",
"0/3000000".parse::<Lsn>().unwrap(),
"000000010000000000000002",
);
}

View File

@@ -55,7 +55,7 @@ fn main() -> Result<()> {
.get_matches();
let wal_craft = |arg_matches: &ArgMatches, client| {
let (intermediate_lsns, end_of_wal_lsn) = match arg_matches.value_of("type").unwrap() {
let lsn = match arg_matches.value_of("type").unwrap() {
Simple::NAME => Simple::craft(client)?,
LastWalRecordXlogSwitch::NAME => LastWalRecordXlogSwitch::craft(client)?,
LastWalRecordXlogSwitchEndsOnPageBoundary::NAME => {
@@ -67,10 +67,7 @@ fn main() -> Result<()> {
LastWalRecordCrossingSegment::NAME => LastWalRecordCrossingSegment::craft(client)?,
a => panic!("Unknown --type argument: {}", a),
};
for lsn in intermediate_lsns {
println!("intermediate_lsn = {}", lsn);
}
println!("end_of_wal = {}", end_of_wal_lsn);
println!("end_of_wal = {}", lsn);
Ok(())
};

View File

@@ -4,7 +4,6 @@ use log::*;
use once_cell::sync::Lazy;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::pg_constants::WAL_SEGMENT_SIZE;
use postgres_ffi::xlog_utils::{
XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
@@ -46,10 +45,6 @@ impl Conf {
self.pg_distrib_dir.join("lib")
}
pub fn wal_dir(&self) -> PathBuf {
self.datadir.join("pg_wal")
}
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
let path = self.pg_bin_dir().join(command);
ensure!(path.exists(), "Command {:?} does not exist", path);
@@ -216,7 +211,7 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result
"Unexpected wal_segment_size unit"
);
ensure!(
wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
wal_segment_size.get::<_, i64>("setting") == 16 * 1024 * 1024,
"Unexpected wal_segment_size in bytes"
);
@@ -226,24 +221,20 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result
pub trait Crafter {
const NAME: &'static str;
/// Generates WAL using the client `client`. Returns a pair of:
/// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
/// May include or exclude Lsn(0) and the end-of-wal.
/// * The expected end-of-wal LSN.
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)>;
/// Generates WAL using the client `client`. Returns the expected end-of-wal LSN.
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn>;
}
fn craft_internal<C: postgres::GenericClient>(
client: &mut C,
f: impl Fn(&mut C, PgLsn) -> Result<(Vec<PgLsn>, Option<PgLsn>)>,
) -> Result<(Vec<PgLsn>, PgLsn)> {
f: impl Fn(&mut C, PgLsn) -> Result<Option<PgLsn>>,
) -> Result<PgLsn> {
ensure_server_config(client)?;
let initial_lsn = client.pg_current_wal_insert_lsn()?;
info!("LSN initial = {}", initial_lsn);
let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
let last_lsn = match last_lsn {
let last_lsn = match f(client, initial_lsn)? {
None => client.pg_current_wal_insert_lsn()?,
Some(last_lsn) => match last_lsn.cmp(&client.pg_current_wal_insert_lsn()?) {
Ordering::Less => bail!("Some records were inserted after the crafted WAL"),
@@ -251,9 +242,6 @@ fn craft_internal<C: postgres::GenericClient>(
Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
},
};
if !intermediate_lsns.starts_with(&[initial_lsn]) {
intermediate_lsns.insert(0, initial_lsn);
}
// Some records may be not flushed, e.g. non-transactional logical messages.
client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
@@ -262,16 +250,16 @@ fn craft_internal<C: postgres::GenericClient>(
Ordering::Equal => {}
Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
}
Ok((intermediate_lsns, last_lsn))
Ok(last_lsn)
}
pub struct Simple;
impl Crafter for Simple {
const NAME: &'static str = "simple";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
craft_internal(client, |client, _| {
client.execute("CREATE table t(x int)", &[])?;
Ok((Vec::new(), None))
Ok(None)
})
}
}
@@ -279,13 +267,12 @@ impl Crafter for Simple {
pub struct LastWalRecordXlogSwitch;
impl Crafter for LastWalRecordXlogSwitch {
const NAME: &'static str = "last_wal_record_xlog_switch";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
client.execute("CREATE table t(x int)", &[])?;
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
let next_segment = PgLsn::from(0x0200_0000);
ensure!(
@@ -294,14 +281,14 @@ impl Crafter for LastWalRecordXlogSwitch {
after_xlog_switch,
next_segment
);
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
Ok(next_segment)
}
}
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
@@ -347,7 +334,6 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
);
// Emit the XLOG_SWITCH
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
let next_segment = PgLsn::from(0x0200_0000);
ensure!(
@@ -361,14 +347,14 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
"XLOG_SWITCH message ended not on page boundary: {}",
after_xlog_switch
);
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
Ok(next_segment)
}
}
fn craft_single_logical_message(
client: &mut impl postgres::GenericClient,
transactional: bool,
) -> Result<(Vec<PgLsn>, PgLsn)> {
) -> Result<PgLsn> {
craft_internal(client, |client, initial_lsn| {
ensure!(
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
@@ -400,9 +386,9 @@ fn craft_single_logical_message(
message_lsn < after_message_lsn,
"No record found after the emitted message"
);
Ok((vec![message_lsn], Some(after_message_lsn)))
Ok(Some(after_message_lsn))
} else {
Ok((Vec::new(), Some(message_lsn)))
Ok(Some(message_lsn))
}
})
}
@@ -410,7 +396,7 @@ fn craft_single_logical_message(
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
craft_single_logical_message(client, true)
}
}
@@ -418,7 +404,7 @@ impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
pub struct LastWalRecordCrossingSegment;
impl Crafter for LastWalRecordCrossingSegment {
const NAME: &'static str = "last_wal_record_crossing_segment";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
craft_single_logical_message(client, false)
}
}

View File

@@ -1,4 +1,15 @@
# Services
## Page server architecture
The Page Server has a few different duties:
- Respond to GetPage@LSN requests from the Compute Nodes
- Receive WAL from WAL safekeeper
- Replay WAL that's applicable to the chunks that the Page Server maintains
- Backup to S3
S3 is the main fault-tolerant storage of all data, as there are no Page Server
replicas. We use a separate fault-tolerant WAL service to reduce latency. It
keeps track of WAL records which are not synced to S3 yet.
The Page Server consists of multiple threads that operate on a shared
repository of page versions:
@@ -10,22 +21,18 @@ repository of page versions:
| WAL receiver |
| |
+--------------+
......
+---------+ +--------+ . .
| | | | . .
GetPage@LSN | | | backup | -------> . S3 .
-------------> | Page | repository | | . .
| Service | +--------+ . .
page | | ......
+----+
+---------+ .......... | |
| | . . | |
GetPage@LSN | | . backup . -------> | S3 |
-------------> | Page | repository . . | |
| Service | .......... | |
page | | +----+
<------------- | |
+---------+ +-----------+ +--------------------+
| WAL redo | | Checkpointing, |
+----------+ | processes | | Garbage collection |
| | +-----------+ +--------------------+
| HTTP |
| mgmt API |
| |
+----------+
+---------+ +--------------------+
| Checkpointing / |
| Garbage collection |
+--------------------+
Legend:
@@ -33,77 +40,28 @@ Legend:
| | A thread or multi-threaded service
+--+
....
. . Component at its early development phase.
....
---> Data flow
<---
```
## Page Service
Page Service
------------
The Page Service listens for GetPage@LSN requests from the Compute Nodes,
and responds with pages from the repository. On each GetPage@LSN request,
it calls into the Repository function
A separate thread is spawned for each incoming connection to the page
service. The page service uses the libpq protocol to communicate with
the client. The client is a Compute Postgres instance.
## WAL Receiver
The WAL receiver connects to the external WAL safekeeping service
using PostgreSQL physical streaming replication, and continuously
receives WAL. It decodes the WAL records, and stores them to the
repository.
and responds with pages from the repository.
## Backup service
WAL Receiver
------------
The backup service, responsible for storing pageserver recovery data externally.
Currently, pageserver stores its files in a filesystem directory it's pointed to.
That working directory could be rather ephemeral for such cases as "a pageserver pod running in k8s with no persistent volumes attached".
Therefore, the server interacts with external, more reliable storage to back up and restore its state.
The code for storage support is extensible and can support arbitrary ones as long as they implement a certain Rust trait.
There are the following implementations present:
* local filesystem — to use in tests mainly
* AWS S3 - to use in production
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
The backup service is disabled by default and can be enabled to interact with a single remote storage.
CLI examples:
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
Required sections are:
```toml
[remote_storage]
local_path = '/Users/someonetoignore/Downloads/tmp_dir/'
```
or
```toml
[remote_storage]
bucket_name = 'some-sample-bucket'
bucket_region = 'eu-north-1'
prefix_in_bucket = '/test_prefix/'
```
`AWS_SECRET_ACCESS_KEY` and `AWS_ACCESS_KEY_ID` env variables can be used to specify the S3 credentials if needed.
## Repository background tasks
The Repository also has a few different background threads and tokio tasks that perform
background duties like dumping accumulated WAL data from memory to disk, reorganizing
files for performance (compaction), and garbage collecting old files.
The WAL receiver connects to the external WAL safekeeping service (or
directly to the primary) using PostgreSQL physical streaming
replication, and continuously receives WAL. It decodes the WAL records,
and stores them to the repository.
Repository
@@ -158,6 +116,48 @@ Remove old on-disk layer files that are no longer needed according to the
PITR retention policy
### Backup service
The backup service, responsible for storing pageserver recovery data externally.
Currently, pageserver stores its files in a filesystem directory it's pointed to.
That working directory could be rather ephemeral for such cases as "a pageserver pod running in k8s with no persistent volumes attached".
Therefore, the server interacts with external, more reliable storage to back up and restore its state.
The code for storage support is extensible and can support arbitrary ones as long as they implement a certain Rust trait.
There are the following implementations present:
* local filesystem — to use in tests mainly
* AWS S3 - to use in production
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
The backup service is disabled by default and can be enabled to interact with a single remote storage.
CLI examples:
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
Required sections are:
```toml
[remote_storage]
local_path = '/Users/someonetoignore/Downloads/tmp_dir/'
```
or
```toml
[remote_storage]
bucket_name = 'some-sample-bucket'
bucket_region = 'eu-north-1'
prefix_in_bucket = '/test_prefix/'
```
`AWS_SECRET_ACCESS_KEY` and `AWS_ACCESS_KEY_ID` env variables can be used to specify the S3 credentials if needed.
TODO: Sharding
--------------------

View File

@@ -1,8 +1,8 @@
//!
//! Timeline repository implementation that keeps old data in files on disk, and
//! Zenith repository implementation that keeps old data in files on disk, and
//! the recent changes in memory. See layered_repository/*_layer.rs files.
//! The functions here are responsible for locating the correct layer for the
//! get/put call, walking back the timeline branching history as needed.
//! get/put call, tracing timeline branching history as needed.
//!
//! The files are stored in the .neon/tenants/<tenantid>/timelines/<timelineid>
//! directory. See layered_repository/README for how the files are managed.
@@ -281,22 +281,12 @@ impl Repository for LayeredRepository {
// concurrently removes data that is needed by the new timeline.
let _gc_cs = self.gc_cs.lock().unwrap();
// In order for the branch creation task to not wait for GC/compaction,
// we need to make sure that the starting LSN of the child branch is not out of scope midway by
//
// 1. holding the GC lock to prevent overwritting timeline's GC data
// 2. checking both the latest GC cutoff LSN and latest GC info of the source timeline
//
// Step 2 is to avoid initializing the new branch using data removed by past GC iterations
// or in-queue GC iterations.
let mut timelines = self.timelines.lock().unwrap();
let src_timeline = self
.get_timeline_load_internal(src, &mut timelines)
// message about timeline being remote is one .context up in the stack
.context("failed to load timeline for branching")?
.ok_or_else(|| anyhow::anyhow!("unknown timeline id: {}", &src))?;
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
@@ -306,30 +296,16 @@ impl Repository for LayeredRepository {
lsn
});
// Check if the starting LSN is out of scope because it is less than
// 1. the latest GC cutoff LSN or
// 2. the planned GC cutoff LSN, which is from an in-queue GC iteration.
src_timeline
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {latest_gc_cutoff_lsn}"
))?;
{
let gc_info = src_timeline.gc_info.read().unwrap();
let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
if start_lsn < cutoff {
bail!(format!(
"invalid branch start lsn: less than planned GC cutoff {cutoff}"
));
}
}
.context("invalid branch start lsn")?;
// Determine prev-LSN for the new timeline. We can only determine it if
// the timeline was branched at the current end of the source timeline.
let RecordLsn {
last: src_last,
prev: src_prev,
} = src_timeline.get_last_record_rlsn();
// Use src_prev from the source timeline only if we branched at the last record.
let dst_prev = if src_last == start_lsn {
Some(src_prev)
} else {
@@ -338,6 +314,7 @@ impl Repository for LayeredRepository {
// create a new timeline directory
let timelinedir = self.conf.timeline_path(&dst, &self.tenant_id);
crashsafe_dir::create_dir(&timelinedir)?;
// Create the metadata file, noting the ancestor of the new timeline.
@@ -464,7 +441,13 @@ impl Repository for LayeredRepository {
Entry::Vacant(_) => bail!("timeline not found"),
};
let layer_removal_guard = timeline_entry.get().layer_removal_guard()?;
// try to acquire gc and compaction locks to prevent errors from missing files
let _gc_guard = self
.gc_cs
.try_lock()
.map_err(|e| anyhow::anyhow!("cannot acquire gc lock {e}"))?;
let compaction_guard = timeline_entry.get().compaction_guard()?;
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
@@ -475,7 +458,7 @@ impl Repository for LayeredRepository {
})?;
info!("detach removed files");
drop(layer_removal_guard);
drop(compaction_guard);
timeline_entry.remove();
Ok(())
@@ -542,10 +525,10 @@ impl LayeredTimelineEntry {
}
}
fn layer_removal_guard(&self) -> Result<Option<MutexGuard<()>>, anyhow::Error> {
fn compaction_guard(&self) -> Result<Option<MutexGuard<()>>, anyhow::Error> {
match self {
LayeredTimelineEntry::Loaded(timeline) => timeline
.layer_removal_cs
.compaction_cs
.try_lock()
.map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}"))
.map(Some),
@@ -776,7 +759,7 @@ impl LayeredRepository {
// https://github.com/neondatabase/neon/issues/1555
if !target_config_path.exists() {
info!(
"tenant config not found in {}",
"Zenith tenant config is not found in {}",
target_config_path.display()
);
return Ok(Default::default());
@@ -875,7 +858,7 @@ impl LayeredRepository {
// +-----baz-------->
//
//
// 1. Grab 'gc_cs' mutex to prevent new timelines from being created
// 1. Grab a mutex to prevent new timelines from being created
// 2. Scan all timelines, and on each timeline, make note of the
// all the points where other timelines have been branched off.
// We will refrain from removing page versions at those LSNs.
@@ -901,56 +884,56 @@ impl LayeredRepository {
let now = Instant::now();
// grab mutex to prevent new timelines from being created here.
let gc_cs = self.gc_cs.lock().unwrap();
let mut timelines = self.timelines.lock().unwrap();
let _gc_cs = self.gc_cs.lock().unwrap();
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new();
let timeline_ids = {
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
if timelines.get(target_timeline_id).is_none() {
bail!("gc target timeline does not exist")
}
};
let mut timeline_ids = Vec::new();
let mut timelines = self.timelines.lock().unwrap();
timelines
.iter()
.map(|(timeline_id, timeline_entry)| {
// This is unresolved question for now, how to do gc in presence of remote timelines
// especially when this is combined with branching.
// Somewhat related: https://github.com/zenithdb/zenith/issues/999
if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() {
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timelineid) = target_timeline_id {
if ancestor_timeline_id == &timelineid {
all_branchpoints
.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn()));
}
}
// Collect branchpoints for all timelines
else {
all_branchpoints
.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn()));
}
}
*timeline_id
})
.collect::<Vec<_>>()
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
if timelines.get(target_timeline_id).is_none() {
bail!("gc target timeline does not exist")
}
};
for (timeline_id, timeline_entry) in timelines.iter() {
timeline_ids.push(*timeline_id);
// This is unresolved question for now, how to do gc in presence of remote timelines
// especially when this is combined with branching.
// Somewhat related: https://github.com/zenithdb/zenith/issues/999
if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() {
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timelineid) = target_timeline_id {
if ancestor_timeline_id == &timelineid {
all_branchpoints
.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn()));
}
}
// Collect branchpoints for all timelines
else {
all_branchpoints.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn()));
}
}
}
// Ok, we now know all the branch points.
// Update the GC information for each timeline.
let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
for timeline_id in timeline_ids {
// Perform GC for each timeline.
for timeline_id in timeline_ids.into_iter() {
if thread_mgr::is_shutdown_requested() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
}
// Timeline is known to be local and loaded.
let timeline = self
.get_timeline_load_internal(timeline_id, &mut *timelines)?
.expect("checked above that timeline is local and loaded");
// If target_timeline is specified, ignore all other timelines
// If target_timeline is specified, only GC it
if let Some(target_timelineid) = target_timeline_id {
if timeline_id != target_timelineid {
continue;
@@ -958,6 +941,7 @@ impl LayeredRepository {
}
if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
drop(timelines);
let branchpoints: Vec<Lsn> = all_branchpoints
.range((
Included((timeline_id, Lsn(0))),
@@ -965,45 +949,21 @@ impl LayeredRepository {
))
.map(|&x| x.1)
.collect();
timeline.update_gc_info(branchpoints, cutoff, pitr)?;
gc_timelines.push(timeline);
// If requested, force flush all in-memory layers to disk first,
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint(CheckpointConfig::Forced)?;
info!("timeline {} checkpoint_before_gc done", timeline_id);
}
timeline.update_gc_info(branchpoints, cutoff, pitr);
let result = timeline.gc()?;
totals += result;
timelines = self.timelines.lock().unwrap();
}
}
drop(timelines);
drop(gc_cs);
// Perform GC for each timeline.
//
// Note that we don't hold the GC lock here because we don't want
// to delay the branch creation task, which requires the GC lock.
// A timeline GC iteration can be slow because it may need to wait for
// compaction (both require `layer_removal_cs` lock),
// but the GC iteration can run concurrently with branch creation.
//
// See comments in [`LayeredRepository::branch_timeline`] for more information
// about why branch creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if thread_mgr::is_shutdown_requested() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
}
// If requested, force flush all in-memory layers to disk first,
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint(CheckpointConfig::Forced)?;
info!(
"timeline {} checkpoint_before_gc done",
timeline.timeline_id
);
}
let result = timeline.gc()?;
totals += result;
}
totals.elapsed = now.elapsed();
Ok(totals)
@@ -1079,11 +1039,11 @@ pub struct LayeredTimeline {
/// Used to ensure that there is only one thread
layer_flush_lock: Mutex<()>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other threads.
/// This lock is acquired in [`LayeredTimeline::gc`], [`LayeredTimeline::compact`],
/// and [`LayeredRepository::delete_timeline`].
layer_removal_cs: Mutex<()>,
// Prevent concurrent compactions.
// Compactions are normally performed by one thread. But compaction can also be manually
// requested by admin (that's used in tests). These forced compactions run in a different
// thread and could be triggered at the same time as a normal, timed compaction.
compaction_cs: Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
latest_gc_cutoff_lsn: RwLock<Lsn>,
@@ -1120,14 +1080,12 @@ struct GcInfo {
/// last-record LSN
///
/// FIXME: is this inclusive or exclusive?
horizon_cutoff: Lsn,
cutoff: Lsn,
/// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
/// point.
/// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()'
/// minus 'pitr_interval'
///
/// This is calculated by finding a number such that a record is needed for PITR
/// if only if its LSN is larger than 'pitr_cutoff'.
pitr_cutoff: Lsn,
pitr: Duration,
}
/// Public interface functions
@@ -1367,12 +1325,12 @@ impl LayeredTimeline {
write_lock: Mutex::new(()),
layer_flush_lock: Mutex::new(()),
layer_removal_cs: Mutex::new(()),
compaction_cs: Mutex::new(()),
gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(),
horizon_cutoff: Lsn(0),
pitr_cutoff: Lsn(0),
cutoff: Lsn(0),
pitr: Duration::ZERO,
}),
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
@@ -1810,23 +1768,24 @@ impl LayeredTimeline {
/// Flush one frozen in-memory layer to disk, as a new delta layer.
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
let layer_paths_to_upload;
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
// repository have the same LSN.
let lsn_range = frozen_layer.get_lsn_range();
let layer_paths_to_upload = if lsn_range.start == self.initdb_lsn
&& lsn_range.end == Lsn(self.initdb_lsn.0 + 1)
{
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
let pgdir = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)?;
let (partitioning, _lsn) =
pgdir.repartition(self.initdb_lsn, self.get_compaction_target_size())?;
self.create_image_layers(&partitioning, self.initdb_lsn, true)?
layer_paths_to_upload =
self.create_image_layers(&partitioning, self.initdb_lsn, true)?;
} else {
// normal case, write out a L0 delta layer file.
let delta_path = self.create_delta_layer(&frozen_layer)?;
HashSet::from([delta_path])
};
layer_paths_to_upload = HashSet::from([delta_path]);
}
fail_point!("flush-frozen-before-sync");
@@ -1993,7 +1952,7 @@ impl LayeredTimeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let _compaction_cs = self.compaction_cs.lock().unwrap();
let target_file_size = self.get_checkpoint_distance();
@@ -2310,34 +2269,46 @@ impl LayeredTimeline {
/// TODO: that's wishful thinking, compaction doesn't actually do that
/// currently.
///
/// The caller specifies how much history is needed with the 3 arguments:
/// The caller specifies how much history is needed with the two arguments:
///
/// retain_lsns: keep a version of each page at these LSNs
/// cutoff_horizon: also keep everything newer than this LSN
/// pitr: the time duration required to keep data for PITR
/// cutoff: also keep everything newer than this LSN
///
/// The 'retain_lsns' list is currently used to prevent removing files that
/// are needed by child timelines. In the future, the user might be able to
/// name additional points in time to retain. The caller is responsible for
/// collecting that information.
///
/// The 'cutoff_horizon' point is used to retain recent versions that might still be
/// The 'cutoff' point is used to retain recent versions that might still be
/// needed by read-only nodes. (As of this writing, the caller just passes
/// the latest LSN subtracted by a constant, and doesn't do anything smart
/// to figure out what read-only nodes might actually need.)
///
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
/// whether a record is needed for PITR.
fn update_gc_info(
&self,
retain_lsns: Vec<Lsn>,
cutoff_horizon: Lsn,
pitr: Duration,
) -> Result<()> {
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn, pitr: Duration) {
let mut gc_info = self.gc_info.write().unwrap();
gc_info.horizon_cutoff = cutoff_horizon;
gc_info.retain_lsns = retain_lsns;
gc_info.cutoff = cutoff;
gc_info.pitr = pitr;
}
///
/// Garbage collect layer files on a timeline that are no longer needed.
///
/// Currently, we don't make any attempt at removing unneeded page versions
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
fn gc(&self) -> Result<GcResult> {
let now = SystemTime::now();
let mut result: GcResult = Default::default();
let disk_consistent_lsn = self.get_disk_consistent_lsn();
let _compaction_cs = self.compaction_cs.lock().unwrap();
let gc_info = self.gc_info.read().unwrap();
let retain_lsns = &gc_info.retain_lsns;
let cutoff = min(gc_info.cutoff, disk_consistent_lsn);
let pitr = gc_info.pitr;
// Calculate pitr cutoff point.
// If we cannot determine a cutoff LSN, be conservative and don't GC anything.
@@ -2346,7 +2317,6 @@ impl LayeredTimeline {
if let Ok(timeline) =
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
{
let now = SystemTime::now();
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
// If we don't have enough data to convert to LSN,
// play safe and don't remove any layers.
@@ -2357,7 +2327,7 @@ impl LayeredTimeline {
LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn,
LsnForTimestamp::Future(lsn) => {
debug!("future({})", lsn);
pitr_cutoff_lsn = gc_info.horizon_cutoff;
pitr_cutoff_lsn = cutoff;
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
@@ -2371,47 +2341,22 @@ impl LayeredTimeline {
} else if cfg!(test) {
// We don't have local timeline in mocked cargo tests.
// So, just ignore pitr_interval setting in this case.
pitr_cutoff_lsn = gc_info.horizon_cutoff;
pitr_cutoff_lsn = cutoff;
}
gc_info.pitr_cutoff = pitr_cutoff_lsn;
Ok(())
}
///
/// Garbage collect layer files on a timeline that are no longer needed.
///
/// Currently, we don't make any attempt at removing unneeded page versions
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
fn gc(&self) -> Result<GcResult> {
let mut result: GcResult = Default::default();
let now = SystemTime::now();
fail_point!("before-timeline-gc");
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let gc_info = self.gc_info.read().unwrap();
let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
let pitr_cutoff = gc_info.pitr_cutoff;
let retain_lsns = &gc_info.retain_lsns;
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn);
// Nothing to GC. Return early.
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
if latest_gc_cutoff >= new_gc_cutoff {
if *self.get_latest_gc_cutoff_lsn() >= new_gc_cutoff {
info!(
"Nothing to GC for timeline {}: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
self.timeline_id
"Nothing to GC for timeline {}. cutoff_lsn {}",
self.timeline_id, new_gc_cutoff
);
result.elapsed = now.elapsed()?;
return Ok(result);
}
let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %new_gc_cutoff).entered();
let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered();
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
// See branch_timeline() for details.
@@ -2445,23 +2390,23 @@ impl LayeredTimeline {
result.layers_total += 1;
// 1. Is it newer than GC horizon cutoff point?
if l.get_lsn_range().end > horizon_cutoff {
// 1. Is it newer than cutoff point?
if l.get_lsn_range().end > cutoff {
debug!(
"keeping {} because it's newer than horizon_cutoff {}",
"keeping {} because it's newer than cutoff {}",
l.filename().display(),
horizon_cutoff
cutoff
);
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// 2. It is newer than PiTR cutoff point?
if l.get_lsn_range().end > pitr_cutoff {
if l.get_lsn_range().end > pitr_cutoff_lsn {
debug!(
"keeping {} because it's newer than pitr_cutoff {}",
"keeping {} because it's newer than pitr_cutoff_lsn {}",
l.filename().display(),
pitr_cutoff
pitr_cutoff_lsn
);
result.layers_needed_by_pitr += 1;
continue 'outer;
@@ -2880,7 +2825,7 @@ pub mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
@@ -2950,7 +2895,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
@@ -3027,7 +2972,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;

View File

@@ -1,4 +1,4 @@
# Pageserver storage
# Overview
The main responsibility of the Page Server is to process the incoming WAL, and
reprocess it into a format that allows reasonably quick access to any page

View File

@@ -928,7 +928,7 @@ fn storage_sync_loop<P, S>(
);
let mut sync_status_updates: HashMap<ZTenantId, HashSet<ZTimelineId>> =
HashMap::new();
let index_accessor = runtime.block_on(index.read());
let index_accessor = runtime.block_on(index.write());
for tenant_id in updated_tenants {
let tenant_entry = match index_accessor.tenant_entry(&tenant_id) {
Some(tenant_entry) => tenant_entry,
@@ -1557,7 +1557,6 @@ fn schedule_first_sync_tasks(
local_timeline_init_statuses
}
/// bool in return value stands for awaits_download
fn compare_local_and_remote_timeline(
new_sync_tasks: &mut VecDeque<(ZTenantTimelineId, SyncTask)>,
sync_id: ZTenantTimelineId,
@@ -1567,6 +1566,14 @@ fn compare_local_and_remote_timeline(
) -> (LocalTimelineInitStatus, bool) {
let remote_files = remote_entry.stored_files();
// TODO probably here we need more sophisticated logic,
// if more data is available remotely can we just download what's there?
// without trying to upload something. It may be tricky, needs further investigation.
// For now looks strange that we can request upload
// and download for the same timeline simultaneously.
// (upload needs to be only for previously unsynced files, not whole timeline dir).
// If one of the tasks fails they will be reordered in the queue which can lead
// to timeline being stuck in evicted state
let number_of_layers_to_download = remote_files.difference(&local_files).count();
let (initial_timeline_status, awaits_download) = if number_of_layers_to_download > 0 {
new_sync_tasks.push_back((

View File

@@ -3,13 +3,12 @@
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
mem,
path::Path,
};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use remote_storage::{path_with_suffix_extension, DownloadError, RemoteObjectName, RemoteStorage};
use remote_storage::{path_with_suffix_extension, RemoteObjectName, RemoteStorage};
use tokio::{
fs,
io::{self, AsyncWriteExt},
@@ -28,50 +27,28 @@ use super::{
pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
// We collect timelines remotely available for each tenant
// in case we failed to gather all index parts (due to an error)
// Poisoned variant is returned.
// When data is received succesfully without errors Present variant is used.
pub enum TenantIndexParts {
Poisoned {
present: HashMap<ZTimelineId, IndexPart>,
missing: HashSet<ZTimelineId>,
},
Present(HashMap<ZTimelineId, IndexPart>),
}
impl TenantIndexParts {
fn add_poisoned(&mut self, timeline_id: ZTimelineId) {
match self {
TenantIndexParts::Poisoned { missing, .. } => {
missing.insert(timeline_id);
}
TenantIndexParts::Present(present) => {
*self = TenantIndexParts::Poisoned {
present: mem::take(present),
missing: HashSet::from([timeline_id]),
}
}
}
}
}
impl Default for TenantIndexParts {
fn default() -> Self {
TenantIndexParts::Present(HashMap::default())
}
}
/// FIXME: Needs cleanup. Currently it swallows errors. Here we need to ensure that
/// we successfully downloaded all metadata parts for one tenant.
/// And successful includes absence of index_part in the remote. Because it is valid situation
/// when timeline was just created and pageserver restarted before upload of index part was completed.
/// But currently RemoteStorage interface does not provide this knowledge because it uses
/// anyhow::Error as an error type. So this needs a refactoring.
///
/// In other words we need to yield only complete sets of tenant timelines.
/// Failure for one timeline of a tenant should exclude whole tenant from returned hashmap.
/// So there are two requirements: keep everything in one futures unordered
/// to allow higher concurrency. Mark tenants as failed independently.
/// That requires some bookeeping.
pub async fn download_index_parts<P, S>(
conf: &'static PageServerConf,
storage: &S,
keys: HashSet<ZTenantTimelineId>,
) -> HashMap<ZTenantId, TenantIndexParts>
) -> HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>>
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let mut index_parts: HashMap<ZTenantId, TenantIndexParts> = HashMap::new();
let mut index_parts: HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>> = HashMap::new();
let mut part_downloads = keys
.into_iter()
@@ -82,29 +59,12 @@ where
match part_upload_result {
Ok(index_part) => {
debug!("Successfully fetched index part for {id}");
match index_parts.entry(id.tenant_id).or_default() {
TenantIndexParts::Poisoned { present, .. } => {
present.insert(id.timeline_id, index_part);
}
TenantIndexParts::Present(parts) => {
parts.insert(id.timeline_id, index_part);
}
}
}
Err(download_error) => {
match download_error {
DownloadError::NotFound => {
// thats ok because it means that we didnt upload something we have locally for example
}
e => {
let tenant_parts = index_parts.entry(id.tenant_id).or_default();
tenant_parts.add_poisoned(id.timeline_id);
error!(
"Failed to fetch index part for {id}: {e} poisoning tenant index parts"
);
}
}
index_parts
.entry(id.tenant_id)
.or_default()
.insert(id.timeline_id, index_part);
}
Err(e) => error!("Failed to fetch index part for {id}: {e}"),
}
}
@@ -159,16 +119,12 @@ where
});
}
match download_index_parts(conf, storage, sync_ids)
download_index_parts(conf, storage, sync_ids)
.await
.remove(&tenant_id)
.ok_or_else(|| anyhow::anyhow!("Missing tenant index parts. This is a bug."))?
{
TenantIndexParts::Poisoned { missing, .. } => {
anyhow::bail!("Failed to download index parts for all timelines. Missing {missing:?}")
}
TenantIndexParts::Present(parts) => Ok(parts),
}
.ok_or(anyhow::anyhow!(
"Missing tenant index parts. This is a bug."
))
}
/// Retrieves index data from the remote storage for a given timeline.
@@ -176,7 +132,7 @@ async fn download_index_part<P, S>(
conf: &'static PageServerConf,
storage: &S,
sync_id: ZTenantTimelineId,
) -> Result<IndexPart, DownloadError>
) -> anyhow::Result<IndexPart>
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
@@ -191,11 +147,15 @@ where
"Failed to get the index part storage path for local path '{}'",
index_part_path.display()
)
})
.map_err(DownloadError::BadInput)?;
let mut index_part_download = storage.download(&part_storage_path).await?;
})?;
let mut index_part_download =
storage
.download(&part_storage_path)
.await
.with_context(|| {
format!("Failed to open download stream for for storage path {part_storage_path:?}")
})?;
let mut index_part_bytes = Vec::new();
io::copy(
&mut index_part_download.download_stream,
@@ -204,16 +164,11 @@ where
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})
.map_err(DownloadError::Other)?;
})?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| {
format!(
"Failed to deserialize index part file from storage path '{part_storage_path:?}'"
)
})
.map_err(DownloadError::Other)?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
})?;
let missing_files = index_part.missing_files();
if !missing_files.is_empty() {

View File

@@ -13,7 +13,6 @@ use anyhow::{anyhow, Context, Ok};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tokio::sync::RwLock;
use tracing::log::warn;
use crate::{config::PageServerConf, layered_repository::metadata::TimelineMetadata};
use utils::{
@@ -21,8 +20,6 @@ use utils::{
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use super::download::TenantIndexParts;
/// A part of the filesystem path, that needs a root to become a path again.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
@@ -91,27 +88,21 @@ pub struct RemoteIndex(Arc<RwLock<RemoteTimelineIndex>>);
impl RemoteIndex {
pub fn from_parts(
conf: &'static PageServerConf,
index_parts: HashMap<ZTenantId, TenantIndexParts>,
index_parts: HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>>,
) -> anyhow::Result<Self> {
let mut entries: HashMap<ZTenantId, TenantEntry> = HashMap::new();
for (tenant_id, index_parts) in index_parts {
match index_parts {
// TODO: should we schedule a retry so it can be recovered? otherwise we can revive it only through detach/attach or pageserver restart
TenantIndexParts::Poisoned { missing, ..} => warn!("skipping tenant_id set up for remote index because the index download has failed for timeline(s): {missing:?}"),
TenantIndexParts::Present(timelines) => {
for (timeline_id, index_part) in timelines {
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
let remote_timeline =
RemoteTimeline::from_index_part(&timeline_path, index_part)
.context("Failed to restore remote timeline data from index part")?;
for (tenant_id, timelines) in index_parts {
for (timeline_id, index_part) in timelines {
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
let remote_timeline =
RemoteTimeline::from_index_part(&timeline_path, index_part)
.context("Failed to restore remote timeline data from index part")?;
entries
.entry(tenant_id)
.or_default()
.insert(timeline_id, remote_timeline);
}
},
entries
.entry(tenant_id)
.or_default()
.insert(timeline_id, remote_timeline);
}
}

52
poetry.lock generated
View File

@@ -544,21 +544,20 @@ test = ["pytest (>=6.2.0)", "pytest-cov", "pytest-subtests", "pytest-xdist", "pr
[[package]]
name = "docker"
version = "4.2.2"
version = "5.0.3"
description = "A Python library for the Docker Engine API."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
python-versions = ">=3.6"
[package.dependencies]
pypiwin32 = {version = "223", markers = "sys_platform == \"win32\" and python_version >= \"3.6\""}
pywin32 = {version = "227", markers = "sys_platform == \"win32\""}
requests = ">=2.14.2,<2.18.0 || >2.18.0"
six = ">=1.4.0"
websocket-client = ">=0.32.0"
[package.extras]
ssh = ["paramiko (>=2.4.2)"]
tls = ["pyOpenSSL (>=17.5.0)", "cryptography (>=1.3.4)", "idna (>=2.0.0)"]
tls = ["pyOpenSSL (>=17.5.0)", "cryptography (>=3.4.7)", "idna (>=2.0.0)"]
[[package]]
name = "ecdsa"
@@ -1004,17 +1003,6 @@ python-versions = ">=3.6"
[package.extras]
diagrams = ["jinja2", "railroad-diagrams"]
[[package]]
name = "pypiwin32"
version = "223"
description = ""
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
pywin32 = ">=223"
[[package]]
name = "pyrsistent"
version = "0.18.1"
@@ -1136,7 +1124,7 @@ python-versions = "*"
[[package]]
name = "pywin32"
version = "301"
version = "227"
description = "Python for Window Extensions"
category = "main"
optional = false
@@ -1513,8 +1501,8 @@ cryptography = [
{file = "cryptography-36.0.1.tar.gz", hash = "sha256:53e5c1dc3d7a953de055d77bef2ff607ceef7a2aac0353b5d630ab67f7423638"},
]
docker = [
{file = "docker-4.2.2-py2.py3-none-any.whl", hash = "sha256:03a46400c4080cb6f7aa997f881ddd84fef855499ece219d75fbdb53289c17ab"},
{file = "docker-4.2.2.tar.gz", hash = "sha256:26eebadce7e298f55b76a88c4f8802476c5eaddbdbe38dbc6cce8781c47c9b54"},
{file = "docker-5.0.3-py2.py3-none-any.whl", hash = "sha256:7a79bb439e3df59d0a72621775d600bc8bc8b422d285824cb37103eab91d1ce0"},
{file = "docker-5.0.3.tar.gz", hash = "sha256:d916a26b62970e7c2f554110ed6af04c7ccff8e9f81ad17d0d40c75637e227fb"},
]
ecdsa = [
{file = "ecdsa-0.17.0-py2.py3-none-any.whl", hash = "sha256:5cf31d5b33743abe0dfc28999036c849a69d548f994b535e527ee3cb7f3ef676"},
@@ -1814,10 +1802,6 @@ pyparsing = [
{file = "pyparsing-3.0.6-py3-none-any.whl", hash = "sha256:04ff808a5b90911829c55c4e26f75fa5ca8a2f5f36aa3a51f68e27033341d3e4"},
{file = "pyparsing-3.0.6.tar.gz", hash = "sha256:d9bdec0013ef1eb5a84ab39a3b3868911598afa494f5faa038647101504e2b81"},
]
pypiwin32 = [
{file = "pypiwin32-223-py3-none-any.whl", hash = "sha256:67adf399debc1d5d14dffc1ab5acacb800da569754fafdc576b2a039485aa775"},
{file = "pypiwin32-223.tar.gz", hash = "sha256:71be40c1fbd28594214ecaecb58e7aa8b708eabfa0125c8a109ebd51edbd776a"},
]
pyrsistent = [
{file = "pyrsistent-0.18.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:df46c854f490f81210870e509818b729db4488e1f30f2a1ce1698b2295a878d1"},
{file = "pyrsistent-0.18.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5d45866ececf4a5fff8742c25722da6d4c9e180daa7b405dc0a2a2790d668c26"},
@@ -1874,16 +1858,18 @@ pytz = [
{file = "pytz-2021.3.tar.gz", hash = "sha256:acad2d8b20a1af07d4e4c9d2e9285c5ed9104354062f275f3fcd88dcef4f1326"},
]
pywin32 = [
{file = "pywin32-301-cp35-cp35m-win32.whl", hash = "sha256:93367c96e3a76dfe5003d8291ae16454ca7d84bb24d721e0b74a07610b7be4a7"},
{file = "pywin32-301-cp35-cp35m-win_amd64.whl", hash = "sha256:9635df6998a70282bd36e7ac2a5cef9ead1627b0a63b17c731312c7a0daebb72"},
{file = "pywin32-301-cp36-cp36m-win32.whl", hash = "sha256:c866f04a182a8cb9b7855de065113bbd2e40524f570db73ef1ee99ff0a5cc2f0"},
{file = "pywin32-301-cp36-cp36m-win_amd64.whl", hash = "sha256:dafa18e95bf2a92f298fe9c582b0e205aca45c55f989937c52c454ce65b93c78"},
{file = "pywin32-301-cp37-cp37m-win32.whl", hash = "sha256:98f62a3f60aa64894a290fb7494bfa0bfa0a199e9e052e1ac293b2ad3cd2818b"},
{file = "pywin32-301-cp37-cp37m-win_amd64.whl", hash = "sha256:fb3b4933e0382ba49305cc6cd3fb18525df7fd96aa434de19ce0878133bf8e4a"},
{file = "pywin32-301-cp38-cp38-win32.whl", hash = "sha256:88981dd3cfb07432625b180f49bf4e179fb8cbb5704cd512e38dd63636af7a17"},
{file = "pywin32-301-cp38-cp38-win_amd64.whl", hash = "sha256:8c9d33968aa7fcddf44e47750e18f3d034c3e443a707688a008a2e52bbef7e96"},
{file = "pywin32-301-cp39-cp39-win32.whl", hash = "sha256:595d397df65f1b2e0beaca63a883ae6d8b6df1cdea85c16ae85f6d2e648133fe"},
{file = "pywin32-301-cp39-cp39-win_amd64.whl", hash = "sha256:87604a4087434cd814ad8973bd47d6524bd1fa9e971ce428e76b62a5e0860fdf"},
{file = "pywin32-227-cp27-cp27m-win32.whl", hash = "sha256:371fcc39416d736401f0274dd64c2302728c9e034808e37381b5e1b22be4a6b0"},
{file = "pywin32-227-cp27-cp27m-win_amd64.whl", hash = "sha256:4cdad3e84191194ea6d0dd1b1b9bdda574ff563177d2adf2b4efec2a244fa116"},
{file = "pywin32-227-cp35-cp35m-win32.whl", hash = "sha256:f4c5be1a293bae0076d93c88f37ee8da68136744588bc5e2be2f299a34ceb7aa"},
{file = "pywin32-227-cp35-cp35m-win_amd64.whl", hash = "sha256:a929a4af626e530383a579431b70e512e736e9588106715215bf685a3ea508d4"},
{file = "pywin32-227-cp36-cp36m-win32.whl", hash = "sha256:300a2db938e98c3e7e2093e4491439e62287d0d493fe07cce110db070b54c0be"},
{file = "pywin32-227-cp36-cp36m-win_amd64.whl", hash = "sha256:9b31e009564fb95db160f154e2aa195ed66bcc4c058ed72850d047141b36f3a2"},
{file = "pywin32-227-cp37-cp37m-win32.whl", hash = "sha256:47a3c7551376a865dd8d095a98deba954a98f326c6fe3c72d8726ca6e6b15507"},
{file = "pywin32-227-cp37-cp37m-win_amd64.whl", hash = "sha256:31f88a89139cb2adc40f8f0e65ee56a8c585f629974f9e07622ba80199057511"},
{file = "pywin32-227-cp38-cp38-win32.whl", hash = "sha256:7f18199fbf29ca99dff10e1f09451582ae9e372a892ff03a28528a24d55875bc"},
{file = "pywin32-227-cp38-cp38-win_amd64.whl", hash = "sha256:7c1ae32c489dc012930787f06244426f8356e129184a02c25aef163917ce158e"},
{file = "pywin32-227-cp39-cp39-win32.whl", hash = "sha256:c054c52ba46e7eb6b7d7dfae4dbd987a1bb48ee86debe3f245a2884ece46e295"},
{file = "pywin32-227-cp39-cp39-win_amd64.whl", hash = "sha256:f27cec5e7f588c3d1051651830ecc00294f90728d19c3bf6916e6dba93ea357c"},
]
pyyaml = [
{file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"},

View File

@@ -83,9 +83,7 @@ impl ElectionLeader {
) -> Result<bool> {
let resp = self.client.leader(election_name).await?;
let kv = resp
.kv()
.ok_or_else(|| anyhow!("failed to get leader response"))?;
let kv = resp.kv().ok_or(anyhow!("failed to get leader response"))?;
let leader = kv.value_str()?;
Ok(leader == candidate_name)

View File

@@ -637,17 +637,6 @@ where
&mut self,
msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> {
// Once voted, we won't accept data from older proposers; flush
// everything we've already received so that new proposer starts
// streaming at end of our WAL, without overlap. Currently we truncate
// WAL at streaming point, so this avoids truncating already committed
// WAL.
//
// TODO: it would be smoother to not truncate committed piece at
// handle_elected instead. Currently not a big deal, as proposer is the
// only source of WAL; with peer2peer recovery it would be more
// important.
self.wal_store.flush_wal()?;
// initialize with refusal
let mut resp = VoteResponse {
term: self.state.acceptor_state.term,

View File

@@ -6,6 +6,7 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiExc
#
# Create ancestor branches off the main branch.
#
# @pytest.mark.parametrize("x", range(20))
def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
@@ -21,7 +22,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
'compaction_target_size': '4194304',
})
env.pageserver.safe_psql("failpoints flush-frozen-before-sync=sleep(10000)")
# env.pageserver.safe_psql("failpoints flush-frozen-before-sync=sleep(10000)")
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()

View File

@@ -1,4 +1,3 @@
import threading
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
@@ -42,6 +41,7 @@ from fixtures.utils import lsn_from_hex
# Because the delta layer D covering lsn1 is corrupted, creating a branch
# starting from lsn1 should return an error as follows:
# could not find data for key ... at LSN ..., for request at LSN ...
# @pytest.mark.skip("")
def test_branch_and_gc(neon_simple_env: NeonEnv):
env = neon_simple_env
@@ -58,9 +58,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
'compaction_target_size': f'{1024 ** 3}',
# tweak the default settings to allow quickly create image layers and L1 layers
'compaction_period': '1 s',
'compaction_period': '10 s',
'compaction_threshold': '2',
'image_creation_threshold': '1',
'image_creation_threshold': '2',
# set PITR interval to be small, so we can do GC
'pitr_interval': '1 s'
@@ -79,6 +79,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
lsn1 = main_cur.fetchone()[0]
log.info(f'LSN1: {lsn1}')
# trigger a manual compaction
env.pageserver.safe_psql(f'''compact {tenant.hex} {timeline_main.hex}''')
main_cur.execute('INSERT INTO foo SELECT FROM generate_series(1, 100000)')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn2 = main_cur.fetchone()[0]
@@ -101,67 +104,3 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
branch_cur.execute('SELECT count(*) FROM foo')
assert branch_cur.fetchone() == (200000, )
# This test simulates a race condition happening when branch creation and GC are performed concurrently.
#
# Suppose we want to create a new timeline 't' from a source timeline 's' starting
# from a lsn 'lsn'. Upon creating 't', if we don't hold the GC lock and compare 'lsn' with
# the latest GC information carefully, it's possible for GC to accidentally remove data
# needed by the new timeline.
#
# In this test, GC is requested before the branch creation but is delayed to happen after branch creation.
# As a result, when doing GC for the source timeline, we don't have any information about
# the upcoming new branches, so it's possible to remove data that may be needed by the new branches.
# It's the branch creation task's job to make sure the starting 'lsn' is not out of scope
# and prevent creating branches with invalid starting LSNs.
#
# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447.
def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
env = neon_simple_env
# Disable background GC but set the `pitr_interval` to be small, so GC can delete something
tenant, _ = env.neon_cli.create_tenant(
conf={
# disable background GC
'gc_period': '10 m',
'gc_horizon': f'{10 * 1024 ** 3}',
# small checkpoint distance to create more delta layer files
'checkpoint_distance': f'{1024 ** 2}',
# set the target size to be large to allow the image layer to cover the whole key space
'compaction_target_size': f'{1024 ** 3}',
# tweak the default settings to allow quickly create image layers and L1 layers
'compaction_period': '1 s',
'compaction_threshold': '2',
'image_creation_threshold': '1',
# set PITR interval to be small, so we can do GC
'pitr_interval': '1 s'
})
b0 = env.neon_cli.create_branch('b0', tenant_id=tenant)
pg0 = env.postgres.create_start('b0', tenant_id=tenant)
res = pg0.safe_psql_many(queries=[
"CREATE TABLE t(key serial primary key)",
"INSERT INTO t SELECT FROM generate_series(1, 100000)",
"SELECT pg_current_wal_insert_lsn()",
"INSERT INTO t SELECT FROM generate_series(1, 100000)",
])
lsn = res[2][0][0]
# Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the
# branch creation task but the individual timeline GC iteration happens *after*
# the branch creation task.
env.pageserver.safe_psql(f"failpoints before-timeline-gc=sleep(2000)")
def do_gc():
env.pageserver.safe_psql(f"do_gc {tenant.hex} {b0.hex} 0")
thread = threading.Thread(target=do_gc, daemon=True)
thread.start()
# The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC.
with pytest.raises(Exception, match="invalid branch start lsn"):
env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn)

View File

@@ -64,6 +64,7 @@ def test_branching_with_pgbench(neon_simple_env: NeonEnv,
# If the number of concurrent threads exceeds a threshold,
# wait for all the threads to finish before spawning a new one.
#
# Because tests defined in `batch_others` are run concurrently in CI,
# we want to avoid the situation that one test exhausts resources for other tests.
if len(threads) >= thread_limit:

View File

@@ -20,22 +20,18 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, caps
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Find the pg_isolation_regress binary
proc = pg_bin.run(['pg_config', '--libdir'], capture_output=True)
libdir = proc.stdout.decode().strip()
proc = pg_bin.run(['pg_config', '--bindir'], capture_output=True)
bindir = proc.stdout.decode().strip()
pg_isolation_regress = os.path.join(libdir,
'postgresql/pgxs/src/test/isolation/pg_isolation_regress')
# Compute all the file locations that pg_isolation_regress will need.
build_path = os.path.join(pg_distrib_dir, 'build/src/test/isolation')
src_path = os.path.join(base_dir, 'vendor/postgres/src/test/isolation')
bindir = os.path.join(pg_distrib_dir, 'bin')
schedule = os.path.join(src_path, 'isolation_schedule')
pg_isolation_regress = os.path.join(build_path, 'pg_isolation_regress')
pg_isolation_regress_command = [
pg_isolation_regress,
'--use-existing',
'--bindir={}'.format(bindir),
'--dlpath={}'.format(build_path),
'--inputdir={}'.format(src_path),
'--schedule={}'.format(schedule),
]

View File

@@ -20,22 +20,19 @@ def test_neon_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, c
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Find the pg_regress binary and --bindir option to pass to it.
proc = pg_bin.run(['pg_config', '--libdir'], capture_output=True)
libdir = proc.stdout.decode().strip()
proc = pg_bin.run(['pg_config', '--bindir'], capture_output=True)
bindir = proc.stdout.decode().strip()
pg_regress = os.path.join(libdir, 'postgresql/pgxs/src/test/regress/pg_regress')
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests
build_path = os.path.join(pg_distrib_dir, 'build/src/test/regress')
src_path = os.path.join(base_dir, 'test_runner/neon_regress')
bindir = os.path.join(pg_distrib_dir, 'bin')
schedule = os.path.join(src_path, 'parallel_schedule')
pg_regress = os.path.join(build_path, 'pg_regress')
pg_regress_command = [
pg_regress,
'--use-existing',
'--bindir={}'.format(bindir),
'--dlpath={}'.format(build_path),
'--schedule={}'.format(schedule),
'--inputdir={}'.format(src_path),
]

View File

@@ -19,23 +19,19 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: pathlib.Path, pg_
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Find the pg_regress binary and --bindir option to pass to it.
proc = pg_bin.run(['pg_config', '--libdir'], capture_output=True)
libdir = proc.stdout.decode().strip()
proc = pg_bin.run(['pg_config', '--bindir'], capture_output=True)
bindir = proc.stdout.decode().strip()
pg_regress = os.path.join(libdir, 'postgresql/pgxs/src/test/regress/pg_regress')
# Compute all the file locations that pg_regress will need.
build_path = os.path.join(pg_distrib_dir, 'build/src/test/regress')
src_path = os.path.join(base_dir, 'vendor/postgres/src/test/regress')
bindir = os.path.join(pg_distrib_dir, 'bin')
schedule = os.path.join(src_path, 'parallel_schedule')
dlpath = os.path.join(base_dir, 'build/src/test/regress')
pg_regress = os.path.join(build_path, 'pg_regress')
pg_regress_command = [
pg_regress,
'--bindir=""',
'--use-existing',
'--bindir={}'.format(bindir),
'--dlpath={}'.format(dlpath),
'--dlpath={}'.format(build_path),
'--schedule={}'.format(schedule),
'--inputdir={}'.format(src_path),
]

View File

@@ -1276,9 +1276,12 @@ class WalCraft(AbstractNeonCli):
res.check_returncode()
return res.stdout.split('\n')
def in_existing(self, type: str, connection: str) -> None:
def in_existing(self, type: str, connection: str) -> int:
res = self.raw_cli(["in-existing", type, connection])
res.check_returncode()
m = re.fullmatch(r'end_of_wal = (.*)\n', res.stdout)
assert m
return lsn_from_hex(m.group(1))
class NeonPageserver(PgProtocol):
@@ -1372,10 +1375,7 @@ class PgBin:
env.update(env_add)
return env
def run(self,
command: List[str],
env: Optional[Env] = None,
**kwargs) -> 'subprocess.CompletedProcess[str]':
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
"""
Run one of the postgres binaries.
@@ -1392,7 +1392,7 @@ class PgBin:
self._fixpath(command)
log.info('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
return subprocess.run(command, env=env, check=True, **kwargs)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],

View File

@@ -83,9 +83,6 @@ def get_dir_size(path: str) -> int:
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
try:
totalbytes += os.path.getsize(os.path.join(root, name))
except FileNotFoundError as e:
pass # file could be concurrently removed
totalbytes += os.path.getsize(os.path.join(root, name))
return totalbytes

View File

@@ -1,110 +0,0 @@
import random
import time
import statistics
import threading
import timeit
import pytest
from typing import List
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
def _record_branch_creation_durations(neon_compare: NeonCompare, durs: List[float]):
neon_compare.zenbenchmark.record("branch_creation_duration_max",
max(durs),
's',
MetricReport.LOWER_IS_BETTER)
neon_compare.zenbenchmark.record("branch_creation_duration_avg",
statistics.mean(durs),
's',
MetricReport.LOWER_IS_BETTER)
neon_compare.zenbenchmark.record("branch_creation_duration_stdev",
statistics.stdev(durs),
's',
MetricReport.LOWER_IS_BETTER)
@pytest.mark.parametrize("n_branches", [20])
# Test measures the latency of branch creation during a heavy [1] workload.
#
# [1]: to simulate a heavy workload, the test tweaks the GC and compaction settings
# to increase the task's frequency. The test runs `pgbench` in each new branch.
# Each branch is created from a randomly picked source branch.
def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int):
env = neon_compare.env
pg_bin = neon_compare.pg_bin
# Use aggressive GC and checkpoint settings, so GC and compaction happen more often during the test
tenant, _ = env.neon_cli.create_tenant(
conf={
'gc_period': '5 s',
'gc_horizon': f'{4 * 1024 ** 2}',
'checkpoint_distance': f'{2 * 1024 ** 2}',
'compaction_target_size': f'{1024 ** 2}',
'compaction_threshold': '2',
# set PITR interval to be small, so we can do GC
'pitr_interval': '5 s'
})
def run_pgbench(branch: str):
log.info(f"Start a pgbench workload on branch {branch}")
pg = env.postgres.create_start(branch, tenant_id=tenant)
connstr = pg.connstr()
pg_bin.run_capture(['pgbench', '-i', connstr])
pg_bin.run_capture(['pgbench', '-c10', '-T10', connstr])
pg.stop()
env.neon_cli.create_branch('b0', tenant_id=tenant)
threads: List[threading.Thread] = []
threads.append(threading.Thread(target=run_pgbench, args=('b0', ), daemon=True))
threads[-1].start()
branch_creation_durations = []
for i in range(n_branches):
time.sleep(1.0)
# random a source branch
p = random.randint(0, i)
timer = timeit.default_timer()
env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(p), tenant_id=tenant)
dur = timeit.default_timer() - timer
log.info(f"Creating branch b{i+1} took {dur}s")
branch_creation_durations.append(dur)
threads.append(threading.Thread(target=run_pgbench, args=(f'b{i+1}', ), daemon=True))
threads[-1].start()
for thread in threads:
thread.join()
_record_branch_creation_durations(neon_compare, branch_creation_durations)
@pytest.mark.parametrize("n_branches", [1024])
# Test measures the latency of branch creation when creating a lot of branches.
def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int):
env = neon_compare.env
env.neon_cli.create_branch('b0')
pg = env.postgres.create_start('b0')
neon_compare.pg_bin.run_capture(['pgbench', '-i', '-s10', pg.connstr()])
branch_creation_durations = []
for i in range(n_branches):
# random a source branch
p = random.randint(0, i)
timer = timeit.default_timer()
env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(p))
dur = timeit.default_timer() - timer
branch_creation_durations.append(dur)
_record_branch_creation_durations(neon_compare, branch_creation_durations)

View File

@@ -33,9 +33,7 @@ itoa = { version = "0.4", features = ["i128", "std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std", "use_std"] }
nom = { version = "7", features = ["alloc", "std"] }
num-bigint = { version = "0.4", features = ["std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "std"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
@@ -43,11 +41,10 @@ regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cac
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
scopeguard = { version = "1", features = ["use_std"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
time = { version = "0.3", features = ["alloc", "formatting", "itoa", "macros", "parsing", "quickcheck", "quickcheck-dep", "std", "time-macros"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "winapi"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] }
tokio-util = { version = "0.7", features = ["codec", "io"] }
tracing = { version = "0.1", features = ["attributes", "log", "std", "tracing-attributes"] }
tracing-core = { version = "0.1", features = ["lazy_static", "std", "valuable"] }
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
[build-dependencies]
ahash = { version = "0.7", features = ["std"] }
@@ -60,7 +57,6 @@ indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std", "use_std"] }
nom = { version = "7", features = ["alloc", "std"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }