Compare commits

..

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
1bc18f2cf2 Separate Postgres build dir from installation dir.
Previously, Postgres was built in 'tmp_install/build', and installed
into 'tmp_install'. In the CI, the 'build' directory was included in
the final neon.tar.zst artifact that includes all the necessary
binaries. That was unnecessary, the intermediate build results are not
needed, only the final binaries. Separate the build directory so that
the Postgres build happens in 'build', and it is installed into
'tmp_install'. That makes the final neon.tar.zst artifact smaller.

The changes to the python tests are needed to find the 'pg_regress'
binary in the installation directory. Previously, they would use the
'pg_regress' binary from the build directory, not the final
installation location.
2022-07-20 15:39:51 +03:00
Heikki Linnakangas
e7c9d66956 Update references to Zenith to Neon in Makefile. 2022-07-20 10:59:21 +03:00
88 changed files with 4062 additions and 5031 deletions

369
.circleci/config.yml Normal file
View File

@@ -0,0 +1,369 @@
version: 2.1
executors:
neon-xlarge-executor:
resource_class: xlarge
docker:
# NB: when changed, do not forget to update rust image tag in all Dockerfiles
- image: neondatabase/rust:1.58
neon-executor:
docker:
- image: neondatabase/rust:1.58
jobs:
# A job to build postgres
build-postgres:
executor: neon-xlarge-executor
parameters:
build_type:
type: enum
enum: ["debug", "release"]
environment:
BUILD_TYPE: << parameters.build_type >>
steps:
# Checkout the git repo (circleci doesn't have a flag to enable submodules here)
- checkout
# Grab the postgres git revision to build a cache key.
# Append makefile as it could change the way postgres is built.
# Note this works even though the submodule hasn't been checkout out yet.
- run:
name: Get postgres cache key
command: |
git rev-parse HEAD:vendor/postgres > /tmp/cache-key-postgres
cat Makefile >> /tmp/cache-key-postgres
- restore_cache:
name: Restore postgres cache
keys:
# Restore ONLY if the rev key matches exactly
- v05-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
# Build postgres if the restore_cache didn't find a build.
# `make` can't figure out whether the cache is valid, since
# it only compares file timestamps.
- run:
name: build postgres
command: |
if [ ! -e tmp_install/bin/postgres ]; then
# "depth 1" saves some time by not cloning the whole repo
git submodule update --init --depth 1
# bail out on any warnings
COPT='-Werror' mold -run make postgres -j$(nproc)
fi
- save_cache:
name: Save postgres cache
key: v05-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
paths:
- tmp_install
# A job to build Neon rust code
build-neon:
executor: neon-xlarge-executor
parameters:
build_type:
type: enum
enum: ["debug", "release"]
environment:
BUILD_TYPE: << parameters.build_type >>
steps:
# Checkout the git repo (without submodules)
- checkout
# Grab the postgres git revision to build a cache key.
# Append makefile as it could change the way postgres is built.
# Note this works even though the submodule hasn't been checkout out yet.
- run:
name: Get postgres cache key
command: |
git rev-parse HEAD:vendor/postgres > /tmp/cache-key-postgres
cat Makefile >> /tmp/cache-key-postgres
- restore_cache:
name: Restore postgres cache
keys:
# Restore ONLY if the rev key matches exactly
- v05-postgres-cache-<< parameters.build_type >>-{{ checksum "/tmp/cache-key-postgres" }}
- restore_cache:
name: Restore rust cache
keys:
# Require an exact match. While an out of date cache might speed up the build,
# there's no way to clean out old packages, so the cache grows every time something
# changes.
- v05-rust-cache-deps-<< parameters.build_type >>-{{ checksum "Cargo.lock" }}
# Build the rust code, including test binaries
- run:
name: Rust build << parameters.build_type >>
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
CARGO_FLAGS="--release --features profiling"
fi
export CARGO_INCREMENTAL=0
export CACHEPOT_BUCKET=zenith-rust-cachepot
export RUSTC_WRAPPER=""
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s
- save_cache:
name: Save rust cache
key: v05-rust-cache-deps-<< parameters.build_type >>-{{ checksum "Cargo.lock" }}
paths:
- ~/.cargo/registry
- ~/.cargo/git
- target
# Run rust unit tests
- run:
name: cargo test
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
CARGO_FLAGS=--release
fi
cargo test $CARGO_FLAGS
# Install the rust binaries, for use by test jobs
- run:
name: Install rust binaries
command: |
binaries=$(
cargo metadata --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
mkdir -p /tmp/zenith/bin
mkdir -p /tmp/zenith/test_bin
mkdir -p /tmp/zenith/etc
# Install target binaries
for bin in $binaries; do
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/zenith/bin/$bin
cp $SRC $DST
done
# Install the postgres binaries, for use by test jobs
- run:
name: Install postgres binaries
command: |
cp -a tmp_install /tmp/zenith/pg_install
# Save rust binaries for other jobs in the workflow
- persist_to_workspace:
root: /tmp/zenith
paths:
- "*"
check-codestyle-python:
executor: neon-executor
steps:
- checkout
- restore_cache:
keys:
- v2-python-deps-{{ checksum "poetry.lock" }}
- run:
name: Install deps
command: ./scripts/pysync
- save_cache:
key: v2-python-deps-{{ checksum "poetry.lock" }}
paths:
- /home/circleci/.cache/pypoetry/virtualenvs
- run:
name: Print versions
when: always
command: |
poetry run python --version
poetry show
- run:
name: Run yapf to ensure code format
when: always
command: poetry run yapf --recursive --diff .
- run:
name: Run mypy to check types
when: always
command: poetry run mypy .
run-pytest:
executor: neon-executor
parameters:
# pytest args to specify the tests to run.
#
# This can be a test file name, e.g. 'test_pgbench.py, or a subdirectory,
# or '-k foobar' to run tests containing string 'foobar'. See pytest man page
# section SPECIFYING TESTS / SELECTING TESTS for details.
#
# Select the type of Rust build. Must be "release" or "debug".
build_type:
type: string
default: "debug"
# This parameter is required, to prevent the mistake of running all tests in one job.
test_selection:
type: string
default: ""
# Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr
extra_params:
type: string
default: ""
needs_postgres_source:
type: boolean
default: false
run_in_parallel:
type: boolean
default: true
save_perf_report:
type: boolean
default: false
environment:
BUILD_TYPE: << parameters.build_type >>
steps:
- attach_workspace:
at: /tmp/zenith
- checkout
- when:
condition: << parameters.needs_postgres_source >>
steps:
- run: git submodule update --init --depth 1
- restore_cache:
keys:
- v2-python-deps-{{ checksum "poetry.lock" }}
- run:
name: Install deps
command: ./scripts/pysync
- save_cache:
key: v2-python-deps-{{ checksum "poetry.lock" }}
paths:
- /home/circleci/.cache/pypoetry/virtualenvs
- run:
name: Run pytest
# pytest doesn't output test logs in real time, so CI job may fail with
# `Too long with no output` error, if a test is running for a long time.
# In that case, tests should have internal timeouts that are less than
# no_output_timeout, specified here.
no_output_timeout: 10m
environment:
- NEON_BIN: /tmp/zenith/bin
- POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
- TEST_OUTPUT: /tmp/test_output
# this variable will be embedded in perf test report
# and is needed to distinguish different environments
- PLATFORM: zenith-local-ci
command: |
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
rm -rf $PERF_REPORT_DIR
TEST_SELECTION="test_runner/<< parameters.test_selection >>"
EXTRA_PARAMS="<< parameters.extra_params >>"
if [ -z "$TEST_SELECTION" ]; then
echo "test_selection must be set"
exit 1
fi
if << parameters.run_in_parallel >>; then
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
fi
if << parameters.save_perf_report >>; then
if [[ $CIRCLE_BRANCH == "main" ]]; then
mkdir -p "$PERF_REPORT_DIR"
EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS"
fi
fi
export GITHUB_SHA=$CIRCLE_SHA1
# Run the tests.
#
# The junit.xml file allows CircleCI to display more fine-grained test information
# in its "Tests" tab in the results page.
# --verbose prints name of each test (helpful when there are
# multiple tests in one file)
# -rA prints summary in the end
# -n4 uses four processes to run tests via pytest-xdist
# -s is not used to prevent pytest from capturing output, because tests are running
# in parallel and logs are mixed between different tests
./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \
--verbose \
-m "not remote_cluster" \
-rA $TEST_SELECTION $EXTRA_PARAMS
if << parameters.save_perf_report >>; then
if [[ $CIRCLE_BRANCH == "main" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO=local
scripts/generate_and_push_perf_report.sh
fi
fi
- run:
# CircleCI artifacts are preserved one file at a time, so skipping
# this step isn't a good idea. If you want to extract the
# pageserver state, perhaps a tarball would be a better idea.
name: Delete all data but logs
when: always
command: |
du -sh /tmp/test_output/*
find /tmp/test_output -type f ! -name "*.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" ! -name "*.metrics" -delete
du -sh /tmp/test_output/*
- store_artifacts:
path: /tmp/test_output
# The store_test_results step tells CircleCI where to find the junit.xml file.
- store_test_results:
path: /tmp/test_output
# Save data (if any)
- persist_to_workspace:
root: /tmp/zenith
paths:
- "*"
workflows:
build_and_test:
jobs:
- check-codestyle-python
- build-postgres:
name: build-postgres-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
- build-neon:
name: build-neon-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
requires:
- build-postgres-<< matrix.build_type >>
- run-pytest:
name: pg_regress-tests-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
test_selection: batch_pg_regress
needs_postgres_source: true
requires:
- build-neon-<< matrix.build_type >>
- run-pytest:
name: other-tests-<< matrix.build_type >>
matrix:
parameters:
build_type: ["debug", "release"]
test_selection: batch_others
requires:
- build-neon-<< matrix.build_type >>
- run-pytest:
name: benchmarks
context: PERF_TEST_RESULT_CONNSTR
build_type: release
test_selection: performance
run_in_parallel: false
save_perf_report: true
requires:
- build-neon-release

View File

@@ -31,6 +31,13 @@ inputs:
runs:
using: "composite"
steps:
- name: Checkout
if: inputs.needs_postgres_source == 'true'
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
with:
@@ -44,12 +51,11 @@ runs:
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
rm -rf ./neon-artifact/
- name: Checkout
if: inputs.needs_postgres_source == 'true'
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
# Restore the parts of the 'build' directory that were included in the
# tarball. This includes the regression test modules in
# src/test/regress/*.so.
mkdir -p build/
cp -a /tmp/neon/pg_build/* build/
- name: Cache poetry deps
id: cache_poetry
@@ -99,7 +105,7 @@ runs:
# Run the tests.
#
# The junit.xml file allows CI tools to display more fine-grained test information
# The junit.xml file allows CircleCI to display more fine-grained test information
# in its "Tests" tab in the results page.
# --verbose prints name of each test (helpful when there are
# multiple tests in one file)

View File

@@ -17,4 +17,4 @@ env_name = prod-1
console_mgmt_base_url = http://console-release.local
bucket_name = zenith-storage-oregon
bucket_region = us-west-2
etcd_endpoints = zenith-1-etcd.local:2379
etcd_endpoints = etcd-release.local:2379

View File

@@ -12,9 +12,10 @@ cat <<EOF | tee /tmp/payload
"version": 1,
"host": "${HOST}",
"port": 6500,
"http_port": 7676,
"region_id": {{ console_region_id }},
"instance_id": "${INSTANCE_ID}"
"instance_id": "${INSTANCE_ID}",
"http_host": "${HOST}",
"http_port": 7676
}
EOF

View File

@@ -17,4 +17,4 @@ env_name = us-stage
console_mgmt_base_url = http://console-staging.local
bucket_name = zenith-staging-storage-us-east-1
bucket_region = us-east-1
etcd_endpoints = zenith-us-stage-etcd.local:2379
etcd_endpoints = etcd-staging.local:2379

View File

@@ -88,7 +88,9 @@ jobs:
id: cache_pg
uses: actions/cache@v3
with:
path: tmp_install/
path: |
tmp_install/
build/src/test/regress/*.so
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_ver.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Build postgres
@@ -143,7 +145,12 @@ jobs:
fi
- name: Install postgres binaries
run: cp -a tmp_install /tmp/neon/pg_install
run: |
cp -a tmp_install /tmp/neon/pg_install
# Include modules needed by the Postgres regression tests
mkdir -p /tmp/neon/pg_build/src/test/regress
cp -a build/src/test/regress/*.so /tmp/neon/pg_build/src/test/regress
- name: Prepare neon artifact
run: ZSTD_NBTHREADS=0 tar -C /tmp/neon/ -cf ./neon.tar.zst --zstd .
@@ -517,7 +524,7 @@ jobs:
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
STAGING='{"env_name": "staging", "proxy_job": "neon-proxy", "proxy_config": "staging.proxy", "kubeconfig_secret": "STAGING_KUBECONFIG_DATA"}'
NEON_STRESS='{"env_name": "neon-stress", "proxy_job": "neon-stress-proxy", "proxy_config": "neon-stress.proxy", "kubeconfig_secret": "NEON_STRESS_KUBECONFIG_DATA"}'
echo "::set-output name=include::[$STAGING, $NEON_STRESS]"
echo "::set-output name=include::[$STAGING]"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
PRODUCTION='{"env_name": "production", "proxy_job": "neon-proxy", "proxy_config": "production.proxy", "kubeconfig_secret": "PRODUCTION_KUBECONFIG_DATA"}'
echo "::set-output name=include::[$PRODUCTION]"

8
Cargo.lock generated
View File

@@ -467,6 +467,7 @@ dependencies = [
"clap 3.2.12",
"env_logger",
"hyper",
"libc",
"log",
"postgres",
"regex",
@@ -516,6 +517,7 @@ dependencies = [
"tar",
"thiserror",
"toml",
"url",
"utils",
"workspace_hack",
]
@@ -1602,6 +1604,7 @@ version = "0.1.0"
dependencies = [
"lazy_static",
"libc",
"once_cell",
"prometheus",
"workspace_hack",
]
@@ -1674,6 +1677,7 @@ dependencies = [
"git-version",
"pageserver",
"postgres",
"postgres_ffi",
"safekeeper",
"serde_json",
"utils",
@@ -1901,6 +1905,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-postgres",
"tokio-stream",
"toml_edit",
"tracing",
"url",
@@ -2759,6 +2764,7 @@ dependencies = [
"daemonize",
"etcd_broker",
"fs2",
"futures",
"git-version",
"hex",
"humantime",
@@ -2778,10 +2784,12 @@ dependencies = [
"tempfile",
"tokio",
"tokio-postgres",
"tokio-util",
"toml_edit",
"tracing",
"url",
"utils",
"walkdir",
"workspace_hack",
]

View File

@@ -17,10 +17,6 @@ RUN set -e \
FROM neondatabase/rust:1.58 AS build
ARG GIT_VERSION=local
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build.
ARG RUSTC_WRAPPER=cachepot
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY

View File

@@ -1,11 +1,7 @@
# First transient image to build compute_tools binaries
# NB: keep in sync with rust image version in .github/workflows/build_and_test.yml
# NB: keep in sync with rust image version in .circle/config.yml
FROM neondatabase/rust:1.58 AS rust-build
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build.
ARG RUSTC_WRAPPER=cachepot
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY

View File

@@ -29,11 +29,9 @@ else
endif
# macOS with brew-installed openssl requires explicit paths
# It can be configured with OPENSSL_PREFIX variable
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
PG_CONFIGURE_OPTS += --with-includes=$(HOMEBREW_PREFIX)/opt/openssl/include --with-libraries=$(HOMEBREW_PREFIX)/opt/openssl/lib
endif
# Choose whether we should be silent or verbose
@@ -48,24 +46,28 @@ CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
#
# Top level Makefile to build Zenith and PostgreSQL
# Top level Makefile to build Neon and PostgreSQL
#
.PHONY: all
all: zenith postgres
all: neon postgres
### Zenith Rust bits
### Neon Rust bits
#
# The 'postgres_ffi' depends on the Postgres headers.
.PHONY: zenith
zenith: postgres-headers
+@echo "Compiling Zenith"
.PHONY: neon
neon: postgres-headers
+@echo "Compiling Neon"
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS)
### PostgreSQL parts
$(POSTGRES_INSTALL_DIR)/build/config.status:
#
# Postgres is built in the 'build' directory, and installed into
# $(POSTGRES_INSTALL_DIR), which defaults to 'tmp_install'
#
build/config.status:
+@echo "Configuring postgres build"
mkdir -p $(POSTGRES_INSTALL_DIR)/build
(cd $(POSTGRES_INSTALL_DIR)/build && \
mkdir -p build
(cd build && \
$(ROOT_PROJECT_DIR)/vendor/postgres/configure CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) \
$(SECCOMP) \
@@ -73,44 +75,44 @@ $(POSTGRES_INSTALL_DIR)/build/config.status:
# nicer alias for running 'configure'
.PHONY: postgres-configure
postgres-configure: $(POSTGRES_INSTALL_DIR)/build/config.status
postgres-configure: build/config.status
# Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/include
.PHONY: postgres-headers
postgres-headers: postgres-configure
+@echo "Installing PostgreSQL headers"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/src/include MAKELEVEL=0 install
$(MAKE) -C build/src/include MAKELEVEL=0 install
# Compile and install PostgreSQL and contrib/neon
.PHONY: postgres
postgres: postgres-configure \
postgres-headers # to prevent `make install` conflicts with zenith's `postgres-headers`
postgres-headers # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build MAKELEVEL=0 install
$(MAKE) -C build MAKELEVEL=0 install
+@echo "Compiling contrib/neon"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/neon install
$(MAKE) -C build/contrib/neon install
+@echo "Compiling contrib/neon_test_utils"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/neon_test_utils install
$(MAKE) -C build/contrib/neon_test_utils install
+@echo "Compiling pg_buffercache"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pg_buffercache install
$(MAKE) -C build/contrib/pg_buffercache install
+@echo "Compiling pageinspect"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pageinspect install
$(MAKE) -C build/contrib/pageinspect install
.PHONY: postgres-clean
postgres-clean:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build MAKELEVEL=0 clean
$(MAKE) -C build MAKELEVEL=0 clean
# This doesn't remove the effects of 'configure'.
.PHONY: clean
clean:
cd $(POSTGRES_INSTALL_DIR)/build && $(MAKE) clean
cd build && $(MAKE) clean
$(CARGO_CMD_PREFIX) cargo clean
# This removes everything
.PHONY: distclean
distclean:
rm -rf $(POSTGRES_INSTALL_DIR)
rm -rf build $(POSTGRES_INSTALL_DIR)
$(CARGO_CMD_PREFIX) cargo clean
.PHONY: fmt

View File

@@ -1,6 +1,6 @@
# Neon
Neon is a serverless open-source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
Neon is a serverless open source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes PostgreSQL storage layer by redistributing data across a cluster of nodes.
The project used to be called "Zenith". Many of the commands and code comments
still refer to "zenith", but we are in the process of renaming things.
@@ -12,31 +12,32 @@ Alternatively, compile and run the project [locally](#running-local-installation
## Architecture overview
A Neon installation consists of compute nodes and a Neon storage engine.
A Neon installation consists of compute nodes and Neon storage engine.
Compute nodes are stateless PostgreSQL nodes backed by the Neon storage engine.
Compute nodes are stateless PostgreSQL nodes, backed by Neon storage engine.
The Neon storage engine consists of two major components:
- Pageserver. Scalable storage backend for the compute nodes.
- WAL service. The service receives WAL from the compute node and ensures that it is stored durably.
Neon storage engine consists of two major components:
- Pageserver. Scalable storage backend for compute nodes.
- WAL service. The service that receives WAL from compute node and ensures that it is stored durably.
Pageserver consists of:
- Repository - Neon storage implementation.
- WAL receiver - service that receives WAL from WAL service and stores it in the repository.
- Page service - service that communicates with compute nodes and responds with pages from the repository.
- WAL redo - service that builds pages from base images and WAL records on Page service request
- WAL redo - service that builds pages from base images and WAL records on Page service request.
## Running local installation
#### Installing dependencies on Linux
1. Install build dependencies and other applicable packages
1. Install build dependencies and other useful packages
* On Ubuntu or Debian, this set of packages should be sufficient to build the code:
* On Ubuntu or Debian this set of packages should be sufficient to build the code:
```bash
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client
```
* On Fedora, these packages are needed:
* On Fedora these packages are needed:
```bash
dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib
@@ -68,7 +69,7 @@ brew install libpq
brew link --force libpq
```
#### Building on Linux
#### Building on Linux and OSX
1. Build neon and patched postgres
```
@@ -79,35 +80,19 @@ cd neon
# The preferred and default is to make a debug build. This will create a
# demonstrably slower build than a release build. If you want to use a release
# build, utilize "BUILD_TYPE=release make -j`nproc`"
# build, utilize "`BUILD_TYPE=release make -j`nproc``"
make -j`nproc`
```
#### Building on OSX
1. Build neon and patched postgres
```
# Note: The path to the neon sources can not contain a space.
git clone --recursive https://github.com/neondatabase/neon.git
cd neon
# The preferred and default is to make a debug build. This will create a
# demonstrably slower build than a release build. If you want to use a release
# build, utilize "BUILD_TYPE=release make -j`sysctl -n hw.logicalcpu`"
make -j`sysctl -n hw.logicalcpu`
```
#### Dependency installation notes
#### dependency installation notes
To run the `psql` client, install the `postgresql-client` package or modify `PATH` and `LD_LIBRARY_PATH` to include `tmp_install/bin` and `tmp_install/lib`, respectively.
To run the integration tests or Python scripts (not required to use the code), install
Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (requires [poetry](https://python-poetry.org/)) in the project directory.
Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (requires poetry) in the project directory.
#### Running neon database
#### running neon database
1. Start pageserver and postgres on top of it (should be called from repo root):
```sh
# Create repository in .neon with proper paths to binaries and data
@@ -138,7 +123,7 @@ Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=pos
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```
2. Now, it is possible to connect to postgres and run some queries:
2. Now it is possible to connect to postgres and run some queries:
```text
> psql -p55432 -h 127.0.0.1 -U cloud_admin postgres
postgres=# CREATE TABLE t(key int primary key, value text);
@@ -196,8 +181,8 @@ postgres=# select * from t;
(1 row)
```
4. If you want to run tests afterward (see below), you must stop all the running of the pageserver, safekeeper, and postgres instances
you have just started. You can terminate them all with one command:
4. If you want to run tests afterwards (see below), you have to stop all the running the pageserver, safekeeper and postgres instances
you have just started. You can stop them all with one command:
```sh
> ./target/debug/neon_local stop
```
@@ -220,8 +205,8 @@ To view your `rustdoc` documentation in a browser, try running `cargo doc --no-d
### Postgres-specific terms
Due to Neon's very close relation with PostgreSQL internals, numerous specific terms are used.
The same applies to certain spelling: i.e. we use MB to denote 1024 * 1024 bytes, while MiB would be technically more correct, it's inconsistent with what PostgreSQL code and its documentation use.
Due to Neon's very close relation with PostgreSQL internals, there are numerous specific terms used.
Same applies to certain spelling: i.e. we use MB to denote 1024 * 1024 bytes, while MiB would be technically more correct, it's inconsistent with what PostgreSQL code and its documentation use.
To get more familiar with this aspect, refer to:

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
libc = "0.2"
anyhow = "1.0"
chrono = "0.4"
clap = "3.0"

View File

@@ -157,7 +157,7 @@ fn main() -> Result<()> {
exit(code)
}
Err(error) => {
error!("could not start the compute node: {:?}", error);
error!("could not start the compute node: {}", error);
let mut state = compute.state.write().unwrap();
state.error = Some(format!("{:?}", error));

View File

@@ -14,6 +14,7 @@ regex = "1"
anyhow = "1.0"
thiserror = "1"
nix = "0.23"
url = "2.2.2"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
pageserver = { path = "../pageserver" }

View File

@@ -304,9 +304,10 @@ impl SafekeeperNode {
Ok(self
.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
format!("{}/{}", self.http_base_url, "timeline"),
)
.json(&TimelineCreateRequest {
tenant_id,
timeline_id,
peer_ids,
})

View File

@@ -12,9 +12,9 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
use pageserver::tenant_mgr::TenantInfo;
use pageserver::timelines::TimelineInfo;
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};

View File

@@ -7,4 +7,5 @@ edition = "2021"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
libc = "0.2"
lazy_static = "1.4"
once_cell = "1.8.0"
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -3,9 +3,6 @@
//! Otherwise, we might not see all metrics registered via
//! a default registry.
use lazy_static::lazy_static;
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_gauge, Gauge};
@@ -21,17 +18,6 @@ pub use prometheus::{Encoder, TextEncoder};
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
pub type UIntGauge = GenericGauge<AtomicU64>;
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
#[macro_export]
macro_rules! register_uint_gauge_vec {
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
let gauge_vec = UIntGaugeVec::new($crate::opts!($NAME, $HELP), $LABELS_NAMES).unwrap();
$crate::register(Box::new(gauge_vec.clone())).map(|_| gauge_vec)
}};
}
/// Gathers all Prometheus metrics and records the I/O stats just before that.
///
/// Metrics gathering is a relatively simple and standalone operation, so

View File

@@ -47,12 +47,10 @@ pub enum FeStartupPacket {
StartupMessage {
major_version: u32,
minor_version: u32,
params: StartupMessageParams,
params: HashMap<String, String>,
},
}
pub type StartupMessageParams = HashMap<String, String>;
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct CancelKeyData {
pub backend_pid: i32,

View File

@@ -15,5 +15,6 @@ git-version = "0.3.5"
pageserver = { path = "../pageserver" }
control_plane = { path = "../control_plane" }
safekeeper = { path = "../safekeeper" }
postgres_ffi = { path = "../libs/postgres_ffi" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -9,7 +9,6 @@ use pageserver::config::defaults::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use pageserver::http::models::TimelineInfo;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
@@ -26,6 +25,8 @@ use utils::{
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use pageserver::timelines::TimelineInfo;
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);

View File

@@ -29,6 +29,7 @@ postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-stream = "0.1.8"
anyhow = { version = "1.0", features = ["backtrace"] }
crc32c = "0.6.0"
thiserror = "1.0"

View File

@@ -23,7 +23,8 @@ use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::DatadirTimeline;
use crate::repository::Timeline;
use crate::DatadirTimelineImpl;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use utils::lsn::Lsn;
@@ -31,13 +32,12 @@ use utils::lsn::Lsn;
/// This is short-living object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between various functions
/// used for constructing tarball.
pub struct Basebackup<'a, W, T>
pub struct Basebackup<'a, W>
where
W: Write,
T: DatadirTimeline,
{
ar: Builder<AbortableWrite<W>>,
timeline: &'a Arc<T>,
timeline: &'a Arc<DatadirTimelineImpl>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
@@ -52,18 +52,17 @@ where
// * When working without safekeepers. In this situation it is important to match the lsn
// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
// to start the replication.
impl<'a, W, T> Basebackup<'a, W, T>
impl<'a, W> Basebackup<'a, W>
where
W: Write,
T: DatadirTimeline,
{
pub fn new(
write: W,
timeline: &'a Arc<T>,
timeline: &'a Arc<DatadirTimelineImpl>,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a, W, T>> {
) -> Result<Basebackup<'a, W>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
@@ -80,13 +79,13 @@ where
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
info!("waiting for {}", req_lsn);
timeline.wait_lsn(req_lsn)?;
timeline.tline.wait_lsn(req_lsn)?;
// If the requested point is the end of the timeline, we can
// provide prev_lsn. (get_last_record_rlsn() might return it as
// zero, though, if no WAL has been generated on this timeline
// yet.)
let end_of_timeline = timeline.get_last_record_rlsn();
let end_of_timeline = timeline.tline.get_last_record_rlsn();
if req_lsn == end_of_timeline.last {
(end_of_timeline.prev, req_lsn)
} else {
@@ -94,7 +93,7 @@ where
}
} else {
// Backup was requested at end of the timeline.
let end_of_timeline = timeline.get_last_record_rlsn();
let end_of_timeline = timeline.tline.get_last_record_rlsn();
(end_of_timeline.prev, end_of_timeline.last)
};
@@ -372,7 +371,7 @@ where
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.get_ancestor_lsn() {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
@@ -403,10 +402,9 @@ where
}
}
impl<'a, W, T> Drop for Basebackup<'a, W, T>
impl<'a, W> Drop for Basebackup<'a, W>
where
W: Write,
T: DatadirTimeline,
{
/// If the basebackup was not finished, prevent the Archive::drop() from
/// writing the end-of-archive marker.

View File

@@ -7,10 +7,6 @@ use utils::{
zid::{NodeId, ZTenantId, ZTimelineId},
};
// These enums are used in the API response fields.
use crate::repository::LocalTimelineState;
use crate::tenant_mgr::TenantState;
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
@@ -101,59 +97,14 @@ impl TenantConfigRequest {
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: ZTenantId,
pub state: Option<TenantState>,
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub has_in_progress_downloads: Option<bool>,
}
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
/// We keep one WAL receiver active per timeline.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocalTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<ZTimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<usize>,
pub current_physical_size_non_incremental: Option<u64>,
pub timeline_state: LocalTimelineState,
pub wal_source_connstr: Option<String>,
pub struct WalReceiverEntry {
pub wal_producer_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RemoteTimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub awaits_download: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: ZTenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: ZTimelineId,
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}

View File

@@ -78,11 +78,6 @@ paths:
schema:
type: string
description: Controls calculation of current_logical_size_non_incremental
- name: include-non-incremental-physical-size
in: query
schema:
type: string
description: Controls calculation of current_physical_size_non_incremental
get:
description: Get timelines for tenant
responses:
@@ -141,11 +136,6 @@ paths:
schema:
type: string
description: Controls calculation of current_logical_size_non_incremental
- name: include-non-incremental-physical-size
in: query
schema:
type: string
description: Controls calculation of current_physical_size_non_incremental
responses:
"200":
description: TimelineInfo
@@ -207,6 +197,54 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get wal receiver's data attached to the timeline
responses:
"200":
description: WalReceiverEntry
content:
application/json:
schema:
$ref: "#/components/schemas/WalReceiverEntry"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"404":
description: Error when no wal receiver is running or found
content:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id
@@ -539,8 +577,6 @@ components:
type: string
state:
type: string
current_physical_size:
type: integer
has_in_progress_downloads:
type: boolean
TenantCreateInfo:
@@ -635,13 +671,18 @@ components:
format: hex
current_logical_size:
type: integer
current_physical_size:
type: integer
current_logical_size_non_incremental:
type: integer
current_physical_size_non_incremental:
WalReceiverEntry:
type: object
required:
- thread_id
- wal_producer_connstr
properties:
thread_id:
type: integer
wal_source_connstr:
wal_producer_connstr:
type: string
last_received_msg_lsn:
type: string

View File

@@ -6,19 +6,16 @@ use hyper::{Body, Request, Response, Uri};
use remote_storage::GenericRemoteStorage;
use tracing::*;
use super::models::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse,
TimelineCreateRequest,
};
use crate::layered_repository::metadata::TimelineMetadata;
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::{LocalTimelineState, RepositoryTimeline};
use crate::repository::{Repository, Timeline};
use crate::repository::Repository;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant_config::TenantConfOpt;
use crate::TimelineImpl;
use crate::tenant_mgr::TenantInfo;
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use crate::{config::PageServerConf, tenant_mgr, timelines};
use utils::{
auth::JwtAuth,
@@ -29,7 +26,6 @@ use utils::{
request::parse_request_param,
RequestExt, RouterBuilder,
},
lsn::Lsn,
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
};
@@ -83,123 +79,6 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
get_state(request).conf
}
// Helper functions to construct a LocalTimelineInfo struct for a timeline
fn local_timeline_info_from_loaded_timeline(
timeline: &TimelineImpl,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
let guard = timeline.last_received_wal.lock().unwrap();
if let Some(info) = guard.as_ref() {
(
Some(info.wal_source_connstr.clone()),
Some(info.last_received_msg_lsn),
Some(info.last_received_msg_ts),
)
} else {
(None, None, None)
}
};
let info = LocalTimelineInfo {
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_lsn: {
match timeline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Loaded,
current_logical_size: Some(timeline.get_current_logical_size()),
current_physical_size: Some(timeline.get_physical_size()),
current_logical_size_non_incremental: if include_non_incremental_logical_size {
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
} else {
None
},
current_physical_size_non_incremental: if include_non_incremental_physical_size {
Some(timeline.get_physical_size_non_incremental()?)
} else {
None
},
wal_source_connstr,
last_received_msg_lsn,
last_received_msg_ts,
};
Ok(info)
}
fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> LocalTimelineInfo {
LocalTimelineInfo {
ancestor_timeline_id: metadata.ancestor_timeline(),
ancestor_lsn: {
match metadata.ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: metadata.disk_consistent_lsn(),
last_record_lsn: metadata.disk_consistent_lsn(),
prev_record_lsn: metadata.prev_record_lsn(),
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Unloaded,
current_logical_size: None,
current_physical_size: None,
current_logical_size_non_incremental: None,
current_physical_size_non_incremental: None,
wal_source_connstr: None,
last_received_msg_lsn: None,
last_received_msg_ts: None,
}
}
fn local_timeline_info_from_repo_timeline(
repo_timeline: &RepositoryTimeline<TimelineImpl>,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
match repo_timeline {
RepositoryTimeline::Loaded(timeline) => local_timeline_info_from_loaded_timeline(
&*timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
),
RepositoryTimeline::Unloaded { metadata } => {
Ok(local_timeline_info_from_unloaded_timeline(metadata))
}
}
}
fn list_local_timelines(
tenant_id: ZTenantId,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
let repo_timelines = repo.list_timelines();
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
for (timeline_id, repository_timeline) in repo_timelines {
local_timeline_info.push((
timeline_id,
local_timeline_info_from_repo_timeline(
&repository_timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)?,
))
}
Ok(local_timeline_info)
}
// healthcheck handler
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let config = get_config(&request);
@@ -214,30 +93,16 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let new_timeline_info = tokio::task::spawn_blocking(move || {
let _enter = info_span!("/timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn).entered();
match timelines::create_timeline(
timelines::create_timeline(
get_config(&request),
tenant_id,
request_data.new_timeline_id.map(ZTimelineId::from),
request_data.ancestor_timeline_id.map(ZTimelineId::from),
request_data.ancestor_start_lsn,
) {
Ok(Some((new_timeline_id, new_timeline))) => {
// Created. Construct a TimelineInfo for it.
let local_info = local_timeline_info_from_loaded_timeline(new_timeline.as_ref(), false, false)?;
Ok(Some(TimelineInfo {
tenant_id,
timeline_id: new_timeline_id,
local: Some(local_info),
remote: None,
}))
}
Ok(None) => Ok(None), // timeline already exists
Err(err) => Err(err),
}
)
})
.await
.map_err(ApiError::from_err)??;
.map_err(ApiError::from_err)??;
Ok(match new_timeline_info {
Some(info) => json_response(StatusCode::CREATED, info)?,
@@ -248,17 +113,10 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let include_non_incremental_logical_size =
query_param_present(&request, "include-non-incremental-logical-size");
let include_non_incremental_physical_size =
query_param_present(&request, "include-non-incremental-physical-size");
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let local_timeline_infos = tokio::task::spawn_blocking(move || {
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
list_local_timelines(
tenant_id,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
crate::timelines::get_local_timelines(tenant_id, include_non_incremental_logical_size)
})
.await
.map_err(ApiError::from_err)??;
@@ -287,15 +145,17 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, response_data)
}
/// Checks if a query param is present in the request's URL
fn query_param_present(request: &Request<Body>, param: &str) -> bool {
// Gate non incremental logical size calculation behind a flag
// after pgbench -i -s100 calculation took 28ms so if multiplied by the number of timelines
// and tenants it can take noticeable amount of time. Also the value currently used only in tests
fn get_include_non_incremental_logical_size(request: &Request<Body>) -> bool {
request
.uri()
.query()
.map(|v| {
url::form_urlencoded::parse(v.as_bytes())
.into_owned()
.any(|(p, _)| p == param)
.any(|(param, _)| param == "include-non-incremental-logical-size")
})
.unwrap_or(false)
}
@@ -305,10 +165,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size =
query_param_present(&request, "include-non-incremental-logical-size");
let include_non_incremental_physical_size =
query_param_present(&request, "include-non-incremental-physical-size");
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let (local_timeline_info, remote_timeline_info) = async {
// any error here will render local timeline as None
@@ -319,10 +176,11 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
repo.get_timeline(timeline_id)
.as_ref()
.map(|timeline| {
local_timeline_info_from_repo_timeline(
LocalTimelineInfo::from_repo_timeline(
tenant_id,
timeline_id,
timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
})
.transpose()?
@@ -367,6 +225,23 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, timeline_info)
}
async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let wal_receiver_entry = crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
.instrument(info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id))
.await
.ok_or_else(|| {
ApiError::NotFound(format!(
"WAL receiver data not found for tenant {tenant_id} and timeline {timeline_id}"
))
})?;
json_response(StatusCode::OK, &wal_receiver_entry)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
@@ -554,36 +429,14 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
let index_accessor = remote_index.read().await;
let has_in_progress_downloads = index_accessor
.tenant_entry(&tenant_id)
.map(|t| t.has_in_progress_downloads())
.unwrap_or_else(|| {
info!("Tenant {tenant_id} not found in remote index");
false
});
let current_physical_size =
match tokio::task::spawn_blocking(move || list_local_timelines(tenant_id, false, false))
.await
.map_err(ApiError::from_err)?
{
Err(err) => {
// Getting local timelines can fail when no local repo is on disk (e.g, when tenant data is being downloaded).
// In that case, put a warning message into log and operate normally.
warn!("Failed to get local timelines for tenant {tenant_id}: {err}");
None
}
Ok(local_timeline_infos) => Some(
local_timeline_infos
.into_iter()
.fold(0, |acc, x| acc + x.1.current_physical_size.unwrap()),
),
};
.ok_or_else(|| ApiError::NotFound("Tenant not found in remote index".to_string()))?
.has_in_progress_downloads();
json_response(
StatusCode::OK,
TenantInfo {
id: tenant_id,
state: tenant_state,
current_physical_size,
has_in_progress_downloads: Some(has_in_progress_downloads),
},
)
@@ -753,5 +606,9 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/detach",
timeline_delete_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver",
wal_receiver_get_handler,
)
.any(handler_404))
}

View File

@@ -13,8 +13,9 @@ use walkdir::WalkDir;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
@@ -28,16 +29,16 @@ use utils::lsn::Lsn;
/// This is currently only used to import a cluster freshly created by initdb.
/// The code that deals with the checkpoint would not work right if the
/// cluster was not shut down cleanly.
pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
pub fn import_timeline_from_postgres_datadir<R: Repository>(
path: &Path,
tline: &T,
tline: &mut DatadirTimeline<R>,
lsn: Lsn,
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
// Import all but pg_wal
@@ -56,12 +57,12 @@ pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
pg_control = Some(control_file);
}
modification.flush(lsn)?;
modification.flush()?;
}
}
// We're done importing all the data files.
modification.commit(lsn)?;
modification.commit()?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -88,8 +89,8 @@ pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
fn import_rel<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
fn import_rel<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
path: &Path,
spcoid: Oid,
dboid: Oid,
@@ -168,8 +169,8 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
/// Import an SLRU segment file
///
fn import_slru<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
fn import_slru<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
slru: SlruKind,
path: &Path,
mut reader: Reader,
@@ -224,9 +225,9 @@ fn import_slru<T: DatadirTimeline, Reader: Read>(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal<T: DatadirTimeline>(
fn import_wal<R: Repository>(
walpath: &Path,
tline: &T,
tline: &mut DatadirTimeline<R>,
startpoint: Lsn,
endpoint: Lsn,
) -> Result<()> {
@@ -267,11 +268,9 @@ fn import_wal<T: DatadirTimeline>(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
nrecords += 1;
@@ -295,13 +294,13 @@ fn import_wal<T: DatadirTimeline>(
Ok(())
}
pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
tline: &T,
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
@@ -319,7 +318,7 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush(base_lsn)?;
modification.flush()?;
}
tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -333,12 +332,12 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit(base_lsn)?;
modification.commit()?;
Ok(())
}
pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
tline: &T,
pub fn import_wal_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,
start_lsn: Lsn,
end_lsn: Lsn,
@@ -385,11 +384,9 @@ pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);
@@ -418,8 +415,8 @@ pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
Ok(())
}
pub fn import_file<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
pub fn import_file<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
file_path: &Path,
reader: Reader,
len: usize,
@@ -538,7 +535,7 @@ pub fn import_file<T: DatadirTimeline, Reader: Read>(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer();
let writer = modification.tline.tline.writer();
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

File diff suppressed because it is too large Load Diff

View File

@@ -316,18 +316,6 @@ impl Layer for DeltaLayer {
}
}
fn key_iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'a> {
let inner = match self.load() {
Ok(inner) => inner,
Err(e) => panic!("Failed to load a delta layer: {e:?}"),
};
match DeltaKeyIter::new(inner) {
Ok(iter) => Box::new(iter),
Err(e) => panic!("Layer index is corrupted: {e:?}"),
}
}
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
@@ -672,21 +660,11 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
}
pub fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
val: &[u8],
will_init: bool,
) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(val)?;
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let blob_ref = BlobRef::new(off, will_init);
let blob_ref = BlobRef::new(off, val.will_init());
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;
@@ -844,75 +822,3 @@ impl<'a> DeltaValueIter<'a> {
}
}
}
///
/// Iterator over all keys stored in a delta layer
///
/// FIXME: This creates a Vector to hold all keys.
/// That takes up quite a lot of memory. Should do this in a more streaming
/// fashion.
///
struct DeltaKeyIter {
all_keys: Vec<(DeltaKey, u64)>,
next_idx: usize,
}
impl Iterator for DeltaKeyIter {
type Item = (Key, Lsn, u64);
fn next(&mut self) -> Option<Self::Item> {
if self.next_idx < self.all_keys.len() {
let (delta_key, size) = &self.all_keys[self.next_idx];
let key = delta_key.key();
let lsn = delta_key.lsn();
self.next_idx += 1;
Some((key, lsn, *size))
} else {
None
}
}
}
impl<'a> DeltaKeyIter {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let mut all_keys: Vec<(DeltaKey, u64)> = Vec::new();
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, value| {
let delta_key = DeltaKey::from_slice(key);
let pos = BlobRef(value).pos();
if let Some(last) = all_keys.last_mut() {
if last.0.key() == delta_key.key() {
return true;
} else {
// subtract offset of new key BLOB and first blob of this key
// to get total size if values associated with this key
let first_pos = last.1;
last.1 = pos - first_pos;
}
}
all_keys.push((delta_key, pos));
true
},
)?;
if let Some(last) = all_keys.last_mut() {
// Last key occupies all space till end of layer
last.1 = std::fs::metadata(&file.file.path)?.len() - last.1;
}
let iter = DeltaKeyIter {
all_keys,
next_idx: 0,
};
Ok(iter)
}
}

View File

@@ -43,7 +43,7 @@ pub struct EphemeralFile {
_timelineid: ZTimelineId,
file: Arc<VirtualFile>,
pub size: u64,
size: u64,
}
impl EphemeralFile {

View File

@@ -15,7 +15,6 @@ use crate::layered_repository::storage_layer::{
use crate::repository::{Key, Value};
use crate::walrecord;
use anyhow::{bail, ensure, Result};
use std::cell::RefCell;
use std::collections::HashMap;
use tracing::*;
use utils::{
@@ -31,12 +30,6 @@ use std::ops::Range;
use std::path::PathBuf;
use std::sync::RwLock;
thread_local! {
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
/// This buffer is reused for each serialization to avoid additional malloc calls.
static SER_BUFFER: RefCell<Vec<u8>> = RefCell::new(Vec::new());
}
pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
@@ -240,14 +233,6 @@ impl Layer for InMemoryLayer {
}
impl InMemoryLayer {
///
/// Get layer size on the disk
///
pub fn size(&self) -> Result<u64> {
let inner = self.inner.read().unwrap();
Ok(inner.file.size)
}
///
/// Create a new, empty, in-memory layer
///
@@ -285,17 +270,10 @@ impl InMemoryLayer {
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timelineid, lsn);
let mut inner = self.inner.write().unwrap();
inner.assert_writeable();
let off = {
SER_BUFFER.with(|x| -> Result<_> {
let mut buf = x.borrow_mut();
buf.clear();
val.ser_into(&mut (*buf))?;
let off = inner.file.write_blob(&buf)?;
Ok(off)
})?
};
let off = inner.file.write_blob(&Value::ser(val)?)?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
@@ -364,8 +342,8 @@ impl InMemoryLayer {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
let val = Value::des(&buf)?;
delta_layer_writer.put_value(key, *lsn, val)?;
}
}

View File

@@ -10,9 +10,9 @@
//! corresponding files are written to disk.
//!
use crate::layered_repository::inmemory_layer::InMemoryLayer;
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::{range_eq, range_overlaps};
use crate::layered_repository::InMemoryLayer;
use crate::repository::Key;
use anyhow::Result;
use lazy_static::lazy_static;

View File

@@ -139,12 +139,6 @@ pub trait Layer: Send + Sync {
/// Iterate through all keys and values stored in the layer
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_>;
/// Iterate through all keys stored in the layer. Returns key, lsn and value size
/// It is used only for compaction and so is currently implemented only for DeltaLayer
fn key_iter(&self) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + '_> {
panic!("Not implemented")
}
/// Permanently remove this layer from disk.
fn delete(&self) -> Result<()>;

File diff suppressed because it is too large Load Diff

View File

@@ -63,7 +63,8 @@ pub enum CheckpointConfig {
}
pub type RepositoryImpl = LayeredRepository;
pub type TimelineImpl = <LayeredRepository as repository::Repository>::Timeline;
pub type DatadirTimelineImpl = DatadirTimeline<RepositoryImpl>;
pub fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint thread. This prevents new connections from

View File

@@ -30,6 +30,7 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
@@ -554,6 +555,9 @@ impl PageServerHandler {
info!("creating new timeline");
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.create_empty_timeline(timeline_id, base_lsn)?;
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -569,7 +573,7 @@ impl PageServerHandler {
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_basebackup_from_tar(&*timeline, reader, base_lsn)?;
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
@@ -579,7 +583,7 @@ impl PageServerHandler {
// Flush data to disk, then upload to s3
info!("flushing layers");
timeline.checkpoint(CheckpointConfig::Flush)?;
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
@@ -601,6 +605,10 @@ impl PageServerHandler {
let timeline = repo.get_timeline_load(timeline_id)?;
ensure!(timeline.get_last_record_lsn() == start_lsn);
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
@@ -608,16 +616,16 @@ impl PageServerHandler {
info!("importing wal");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_wal_from_tar(&*timeline, reader, start_lsn, end_lsn)?;
import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?;
// TODO Does it make sense to overshoot?
ensure!(timeline.get_last_record_lsn() >= end_lsn);
ensure!(datadir_timeline.tline.get_last_record_lsn() >= end_lsn);
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
timeline.checkpoint(CheckpointConfig::Flush)?;
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
@@ -635,8 +643,8 @@ impl PageServerHandler {
/// In either case, if the page server hasn't received the WAL up to the
/// requested LSN yet, we will wait for it to arrive. The return value is
/// the LSN that should be used to look up the page versions.
fn wait_or_get_last_lsn<T: DatadirTimeline>(
timeline: &T,
fn wait_or_get_last_lsn<R: Repository>(
timeline: &DatadirTimeline<R>,
mut lsn: Lsn,
latest: bool,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
@@ -663,7 +671,7 @@ impl PageServerHandler {
if lsn <= last_record_lsn {
lsn = last_record_lsn;
} else {
timeline.wait_lsn(lsn)?;
timeline.tline.wait_lsn(lsn)?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
@@ -673,7 +681,7 @@ impl PageServerHandler {
if lsn == Lsn(0) {
bail!("invalid LSN(0) in request");
}
timeline.wait_lsn(lsn)?;
timeline.tline.wait_lsn(lsn)?;
}
ensure!(
lsn >= **latest_gc_cutoff_lsn,
@@ -683,14 +691,14 @@ impl PageServerHandler {
Ok(lsn)
}
fn handle_get_rel_exists_request<T: DatadirTimeline>(
fn handle_get_rel_exists_request<R: Repository>(
&self,
timeline: &T,
timeline: &DatadirTimeline<R>,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let exists = timeline.get_rel_exists(req.rel, lsn)?;
@@ -700,13 +708,13 @@ impl PageServerHandler {
}))
}
fn handle_get_nblocks_request<T: DatadirTimeline>(
fn handle_get_nblocks_request<R: Repository>(
&self,
timeline: &T,
timeline: &DatadirTimeline<R>,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let n_blocks = timeline.get_rel_size(req.rel, lsn)?;
@@ -716,13 +724,13 @@ impl PageServerHandler {
}))
}
fn handle_db_size_request<T: DatadirTimeline>(
fn handle_db_size_request<R: Repository>(
&self,
timeline: &T,
timeline: &DatadirTimeline<R>,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_db_size", dbnode = %req.dbnode, req_lsn = %req.lsn).entered();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let total_blocks =
@@ -735,14 +743,14 @@ impl PageServerHandler {
}))
}
fn handle_get_page_at_lsn_request<T: DatadirTimeline>(
fn handle_get_page_at_lsn_request<R: Repository>(
&self,
timeline: &T,
timeline: &DatadirTimeline<R>,
req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn)
.entered();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
/*
// Add a 1s delay to some requests. The delayed causes the requests to
@@ -775,7 +783,7 @@ impl PageServerHandler {
// check that the timeline exists
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
timeline
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
@@ -913,7 +921,7 @@ impl postgres_backend::Handler for PageServerHandler {
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
let end_of_timeline = timeline.get_last_record_rlsn();
let end_of_timeline = timeline.tline.get_last_record_rlsn();
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::text_col(b"prev_lsn"),
@@ -1131,7 +1139,7 @@ impl postgres_backend::Handler for PageServerHandler {
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Couldn't load timeline")?;
timeline.compact()?;
timeline.tline.compact()?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
@@ -1151,8 +1159,13 @@ impl postgres_backend::Handler for PageServerHandler {
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
// Checkpoint the timeline and also compact it (due to `CheckpointConfig::Forced`).
timeline.checkpoint(CheckpointConfig::Forced)?;
timeline.tline.checkpoint(CheckpointConfig::Forced)?;
// Also compact it.
//
// FIXME: This probably shouldn't be part of a "checkpoint" command, but a
// separate operation. Update the tests if you change this.
timeline.tline.compact()?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -6,10 +6,10 @@
//! walingest.rs handles a few things like implicit relation creation and extension.
//! Clarify that)
//!
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Timeline;
use crate::repository::*;
use crate::repository::{Repository, Timeline};
use crate::walrecord::ZenithWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
@@ -18,12 +18,34 @@ use postgres_ffi::{pg_constants, Oid, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::ops::Range;
use tracing::{debug, trace, warn};
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::{Arc, Mutex, RwLockReadGuard};
use tracing::{debug, error, trace, warn};
use utils::{bin_ser::BeSer, lsn::Lsn};
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
pub type BlockNumber = u32;
pub struct DatadirTimeline<R>
where
R: Repository,
{
/// The underlying key-value store. Callers should not read or modify the
/// data in the underlying store directly. However, it is exposed to have
/// access to information like last-LSN, ancestor, and operations like
/// compaction.
pub tline: Arc<R::Timeline>,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: AtomicIsize,
}
#[derive(Debug)]
pub enum LsnForTimestamp {
Present(Lsn),
@@ -32,50 +54,49 @@ pub enum LsnForTimestamp {
NoData(Lsn),
}
///
/// This trait provides all the functionality to store PostgreSQL relations, SLRUs,
/// and other special kinds of files, in a versioned key-value store. The
/// Timeline trait provides the key-value store.
///
/// This is a trait, so that we can easily include all these functions in a Timeline
/// implementation. You're not expected to have different implementations of this trait,
/// rather, this provides an interface and implementation, over Timeline.
///
/// If you wanted to store other kinds of data in the Neon repository, e.g.
/// flat files or MySQL, you would create a new trait like this, with all the
/// functions that make sense for the kind of data you're storing. For flat files,
/// for example, you might have a function like "fn read(path, offset, size)".
/// We might also have that situation in the future, to support multiple PostgreSQL
/// versions, if there are big changes in how the data is organized in the data
/// directory, or if new special files are introduced.
///
pub trait DatadirTimeline: Timeline {
impl<R: Repository> DatadirTimeline<R> {
pub fn new(tline: Arc<R::Timeline>, repartition_threshold: u64) -> Self {
DatadirTimeline {
tline,
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
current_logical_size: AtomicIsize::new(0),
repartition_threshold,
}
}
/// (Re-)calculate the logical size of the database at the latest LSN.
///
/// This can be a slow operation.
pub fn init_logical_size(&self) -> Result<()> {
let last_lsn = self.tline.get_last_record_lsn();
self.current_logical_size.store(
self.get_current_logical_size_non_incremental(last_lsn)? as isize,
Ordering::SeqCst,
);
Ok(())
}
/// Start ingesting a WAL record, or other atomic modification of
/// the timeline.
///
/// This provides a transaction-like interface to perform a bunch
/// of modifications atomically.
/// of modifications atomically, all stamped with one LSN.
///
/// To ingest a WAL record, call begin_modification() to get a
/// To ingest a WAL record, call begin_modification(lsn) to get a
/// DatadirModification object. Use the functions in the object to
/// modify the repository state, updating all the pages and metadata
/// that the WAL record affects. When you're done, call commit(lsn) to
/// commit the changes. All the changes will be stamped with the specified LSN.
///
/// Calling commit(lsn) will flush all the changes and reset the state,
/// so the `DatadirModification` struct can be reused to perform the next modification.
/// that the WAL record affects. When you're done, call commit() to
/// commit the changes.
///
/// Note that any pending modifications you make through the
/// modification object won't be visible to calls to the 'get' and list
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
fn begin_modification(&self) -> DatadirModification<Self>
where
Self: Sized,
{
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification<R> {
DatadirModification {
tline: self,
lsn,
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
@@ -87,7 +108,7 @@ pub trait DatadirTimeline: Timeline {
//------------------------------------------------------------------------------
/// Look up given page version.
fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
pub fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
ensure!(tag.relnode != 0, "invalid relnode");
let nblocks = self.get_rel_size(tag, lsn)?;
@@ -100,11 +121,11 @@ pub trait DatadirTimeline: Timeline {
}
let key = rel_block_to_key(tag, blknum);
self.get(key, lsn)
self.tline.get(key, lsn)
}
// Get size of a database in blocks
fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
let mut total_blocks = 0;
let rels = self.list_rels(spcnode, dbnode, lsn)?;
@@ -117,7 +138,7 @@ pub trait DatadirTimeline: Timeline {
}
/// Get size of a relation file
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
ensure!(tag.relnode != 0, "invalid relnode");
if (tag.forknum == pg_constants::FSM_FORKNUM
@@ -132,17 +153,17 @@ pub trait DatadirTimeline: Timeline {
}
let key = rel_size_to_key(tag);
let mut buf = self.get(key, lsn)?;
let mut buf = self.tline.get(key, lsn)?;
Ok(buf.get_u32_le())
}
/// Does relation exist?
fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
ensure!(tag.relnode != 0, "invalid relnode");
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = self.get(key, lsn)?;
let buf = self.tline.get(key, lsn)?;
let dir = RelDirectory::des(&buf)?;
let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
@@ -151,10 +172,10 @@ pub trait DatadirTimeline: Timeline {
}
/// Get a list of all existing relations in given tablespace and database.
fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = self.get(key, lsn)?;
let buf = self.tline.get(key, lsn)?;
let dir = RelDirectory::des(&buf)?;
let rels: HashSet<RelTag> =
@@ -169,7 +190,7 @@ pub trait DatadirTimeline: Timeline {
}
/// Look up given SLRU page version.
fn get_slru_page_at_lsn(
pub fn get_slru_page_at_lsn(
&self,
kind: SlruKind,
segno: u32,
@@ -177,21 +198,26 @@ pub trait DatadirTimeline: Timeline {
lsn: Lsn,
) -> Result<Bytes> {
let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn)
self.tline.get(key, lsn)
}
/// Get size of an SLRU segment
fn get_slru_segment_size(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<BlockNumber> {
pub fn get_slru_segment_size(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
) -> Result<BlockNumber> {
let key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(key, lsn)?;
let mut buf = self.tline.get(key, lsn)?;
Ok(buf.get_u32_le())
}
/// Get size of an SLRU segment
fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = self.get(key, lsn)?;
let buf = self.tline.get(key, lsn)?;
let dir = SlruSegmentDirectory::des(&buf)?;
let exists = dir.segments.get(&segno).is_some();
@@ -205,10 +231,10 @@ pub trait DatadirTimeline: Timeline {
/// so it's not well defined which LSN you get if there were multiple commits
/// "in flight" at that point in time.
///
fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result<LsnForTimestamp> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result<LsnForTimestamp> {
let gc_cutoff_lsn_guard = self.tline.get_latest_gc_cutoff_lsn();
let min_lsn = *gc_cutoff_lsn_guard;
let max_lsn = self.get_last_record_lsn();
let max_lsn = self.tline.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the
// LSN divided by 8.
@@ -299,51 +325,88 @@ pub trait DatadirTimeline: Timeline {
}
/// Get a list of SLRU segments
fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
// fetch directory entry
let key = slru_dir_to_key(kind);
let buf = self.get(key, lsn)?;
let buf = self.tline.get(key, lsn)?;
let dir = SlruSegmentDirectory::des(&buf)?;
Ok(dir.segments)
}
fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
let key = relmap_file_key(spcnode, dbnode);
let buf = self.get(key, lsn)?;
let buf = self.tline.get(key, lsn)?;
Ok(buf)
}
fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
pub fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
// fetch directory entry
let buf = self.get(DBDIR_KEY, lsn)?;
let buf = self.tline.get(DBDIR_KEY, lsn)?;
let dir = DbDirectory::des(&buf)?;
Ok(dir.dbdirs)
}
fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
let key = twophase_file_key(xid);
let buf = self.get(key, lsn)?;
let buf = self.tline.get(key, lsn)?;
Ok(buf)
}
fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
pub fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
// fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn)?;
let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?;
let dir = TwoPhaseDirectory::des(&buf)?;
Ok(dir.xids)
}
fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
self.get(CONTROLFILE_KEY, lsn)
pub fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
self.tline.get(CONTROLFILE_KEY, lsn)
}
fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
self.get(CHECKPOINT_KEY, lsn)
pub fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
self.tline.get(CHECKPOINT_KEY, lsn)
}
/// Get the LSN of the last ingested WAL record.
///
/// This is just a convenience wrapper that calls through to the underlying
/// repository.
pub fn get_last_record_lsn(&self) -> Lsn {
self.tline.get_last_record_lsn()
}
/// Check that it is valid to request operations with that lsn.
///
/// This is just a convenience wrapper that calls through to the underlying
/// repository.
pub fn check_lsn_is_in_scope(
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
) -> Result<()> {
self.tline.check_lsn_is_in_scope(lsn, latest_gc_cutoff_lsn)
}
/// Retrieve current logical size of the timeline
///
/// NOTE: counted incrementally, includes ancestors,
pub fn get_current_logical_size(&self) -> usize {
let current_logical_size = self.current_logical_size.load(Ordering::Acquire);
match usize::try_from(current_logical_size) {
Ok(sz) => sz,
Err(_) => {
error!(
"current_logical_size is out of range: {}",
current_logical_size
);
0
}
}
}
/// Does the same as get_current_logical_size but counted on demand.
@@ -351,16 +414,16 @@ pub trait DatadirTimeline: Timeline {
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn)?;
let buf = self.tline.get(DBDIR_KEY, lsn)?;
let dbdir = DbDirectory::des(&buf)?;
let mut total_size: usize = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self.list_rels(*spcnode, *dbnode, lsn)? {
let relsize_key = rel_size_to_key(rel);
let mut buf = self.get(relsize_key, lsn)?;
let mut buf = self.tline.get(relsize_key, lsn)?;
let relsize = buf.get_u32_le();
total_size += relsize as usize;
@@ -381,7 +444,7 @@ pub trait DatadirTimeline: Timeline {
result.add_key(DBDIR_KEY);
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn)?;
let buf = self.tline.get(DBDIR_KEY, lsn)?;
let dbdir = DbDirectory::des(&buf)?;
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
@@ -398,7 +461,7 @@ pub trait DatadirTimeline: Timeline {
rels.sort_unstable();
for rel in rels {
let relsize_key = rel_size_to_key(rel);
let mut buf = self.get(relsize_key, lsn)?;
let mut buf = self.tline.get(relsize_key, lsn)?;
let relsize = buf.get_u32_le();
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
@@ -414,13 +477,13 @@ pub trait DatadirTimeline: Timeline {
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn)?;
let buf = self.tline.get(slrudir_key, lsn)?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn)?;
let mut buf = self.tline.get(segsize_key, lsn)?;
let segsize = buf.get_u32_le();
result.add_range(
@@ -432,7 +495,7 @@ pub trait DatadirTimeline: Timeline {
// Then pg_twophase
result.add_key(TWOPHASEDIR_KEY);
let buf = self.get(TWOPHASEDIR_KEY, lsn)?;
let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?;
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
xids.sort_unstable();
@@ -445,17 +508,32 @@ pub trait DatadirTimeline: Timeline {
Ok(result.to_keyspace())
}
pub fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
let mut partitioning_guard = self.partitioning.lock().unwrap();
if partitioning_guard.1 == Lsn(0)
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
{
let keyspace = self.collect_keyspace(lsn)?;
let partitioning = keyspace.partition(partition_size);
*partitioning_guard = (partitioning, lsn);
return Ok((partitioning_guard.0.clone(), lsn));
}
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
}
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
/// by a one WAL record appear atomic.
pub struct DatadirModification<'a, T: DatadirTimeline> {
pub struct DatadirModification<'a, R: Repository> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
/// in the state in 'tline' yet.
pub tline: &'a T,
pub tline: &'a DatadirTimeline<R>,
lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
@@ -465,7 +543,7 @@ pub struct DatadirModification<'a, T: DatadirTimeline> {
pending_nblocks: isize,
}
impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
impl<'a, R: Repository> DatadirModification<'a, R> {
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -842,7 +920,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self, lsn: Lsn) -> Result<()> {
pub fn flush(&mut self) -> Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -850,13 +928,13 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
return Ok(());
}
let writer = self.tline.writer();
let writer = self.tline.tline.writer();
// Flush relation and SLRU data blocks, keep metadata.
let mut result: Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, lsn, value);
result = writer.put(key, self.lsn, value);
false
} else {
true
@@ -865,7 +943,10 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
result?;
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
self.tline.current_logical_size.fetch_add(
pending_nblocks * pg_constants::BLCKSZ as isize,
Ordering::SeqCst,
);
self.pending_nblocks = 0;
}
@@ -875,25 +956,26 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
///
/// Finish this atomic update, writing all the updated keys to the
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
let writer = self.tline.writer();
pub fn commit(self) -> Result<()> {
let writer = self.tline.tline.writer();
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value)?;
for (key, value) in self.pending_updates {
writer.put(key, self.lsn, &value)?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
for key_range in self.pending_deletions {
writer.delete(key_range.clone(), self.lsn)?;
}
writer.finish_write(lsn);
writer.finish_write(self.lsn);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
self.tline.current_logical_size.fetch_add(
pending_nblocks * pg_constants::BLCKSZ as isize,
Ordering::SeqCst,
);
}
Ok(())
@@ -920,7 +1002,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
}
} else {
let last_lsn = self.tline.get_last_record_lsn();
self.tline.get(key, last_lsn)
self.tline.tline.get(key, last_lsn)
}
}
@@ -1322,12 +1404,13 @@ fn is_slru_block_key(key: Key) -> bool {
pub fn create_test_timeline<R: Repository>(
repo: R,
timeline_id: utils::zid::ZTimelineId,
) -> Result<std::sync::Arc<R::Timeline>> {
) -> Result<Arc<crate::DatadirTimeline<R>>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let mut m = tline.begin_modification();
let tline = DatadirTimeline::new(tline, 256 * 1024);
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit(Lsn(8))?;
Ok(tline)
m.commit()?;
Ok(Arc::new(tline))
}
#[allow(clippy::bool_assert_comparison)]
@@ -1400,7 +1483,7 @@ mod tests {
.contains(&TESTREL_A));
// Run checkpoint and garbage collection and check that it's still not visible
newtline.checkpoint(CheckpointConfig::Forced)?;
newtline.tline.checkpoint(CheckpointConfig::Forced)?;
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
assert!(!newtline

View File

@@ -185,7 +185,7 @@ impl Value {
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
type Timeline: crate::DatadirTimeline;
type Timeline: Timeline;
/// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
@@ -277,6 +277,15 @@ pub enum LocalTimelineState {
Unloaded,
}
impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
fn from(local_timeline_entry: &'a RepositoryTimeline<T>) -> Self {
match local_timeline_entry {
RepositoryTimeline::Loaded(_) => LocalTimelineState::Loaded,
RepositoryTimeline::Unloaded { .. } => LocalTimelineState::Unloaded,
}
}
}
///
/// Result of performing GC
///
@@ -373,11 +382,6 @@ pub trait Timeline: Send + Sync {
lsn: Lsn,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
) -> Result<()>;
/// Get the physical size of the timeline at the latest LSN
fn get_physical_size(&self) -> u64;
/// Get the physical size of the timeline at the latest LSN non incrementally
fn get_physical_size_non_incremental(&self) -> Result<u64>;
}
/// Various functions to mutate the timeline.
@@ -401,8 +405,6 @@ pub trait TimelineWriter<'a> {
/// the 'lsn' or anything older. The previous last record LSN is stored alongside
/// the latest and can be read.
fn finish_write(&self, lsn: Lsn);
fn update_current_logical_size(&self, delta: isize);
}
#[cfg(test)]

View File

@@ -176,6 +176,7 @@ use crate::{
layered_repository::{
ephemeral_file::is_ephemeral_file,
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
LayeredRepository,
},
storage_sync::{self, index::RemoteIndex},
tenant_mgr::attach_downloaded_tenants,
@@ -1120,7 +1121,7 @@ where
.instrument(info_span!("download_timeline_data")),
);
if let Some(mut delete_data) = batch.delete {
if let Some(delete_data) = batch.delete {
if upload_result.is_some() {
match validate_task_retries(delete_data, max_sync_errors)
.instrument(info_span!("retries_validation"))
@@ -1153,7 +1154,6 @@ where
}
}
} else {
delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
}
@@ -1257,13 +1257,7 @@ async fn update_local_metadata(
timeline_id,
} = sync_id;
tokio::task::spawn_blocking(move || {
crate::layered_repository::save_metadata(
conf,
timeline_id,
tenant_id,
&cloned_metadata,
true,
)
LayeredRepository::save_metadata(conf, timeline_id, tenant_id, &cloned_metadata, true)
})
.await
.with_context(|| {

View File

@@ -2,8 +2,8 @@
//! page server.
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::layered_repository::{load_metadata, LayeredRepository};
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::Repository;
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
@@ -12,9 +12,10 @@ use crate::thread_mgr::ThreadKind;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{RepositoryImpl, TimelineImpl};
use crate::{DatadirTimelineImpl, RepositoryImpl};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt;
@@ -100,7 +101,7 @@ struct Tenant {
///
/// Local timelines have more metadata that's loaded into memory,
/// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`].
local_timelines: HashMap<ZTimelineId, Arc<<RepositoryImpl as Repository>::Timeline>>,
local_timelines: HashMap<ZTimelineId, Arc<DatadirTimelineImpl>>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -177,7 +178,7 @@ pub enum LocalTimelineUpdate {
},
Attach {
id: ZTenantTimelineId,
datadir: Arc<<RepositoryImpl as Repository>::Timeline>,
datadir: Arc<DatadirTimelineImpl>,
},
}
@@ -381,7 +382,7 @@ pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<Rep
pub fn get_local_timeline_with_load(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<TimelineImpl>> {
) -> anyhow::Result<Arc<DatadirTimelineImpl>> {
let mut m = tenants_state::write_tenants();
let tenant = m
.get_mut(&tenant_id)
@@ -488,23 +489,34 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
fn load_local_timeline(
repo: &RepositoryImpl,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<TimelineImpl>> {
) -> anyhow::Result<Arc<DatadirTimeline<LayeredRepository>>> {
let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| {
format!("Inmem timeline {timeline_id} not found in tenant's repository")
})?;
inmem_timeline.init_logical_size()?;
let repartition_distance = repo.get_checkpoint_distance() / 10;
let page_tline = Arc::new(DatadirTimelineImpl::new(
inmem_timeline,
repartition_distance,
));
page_tline.init_logical_size()?;
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach {
id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
datadir: Arc::clone(&inmem_timeline),
datadir: Arc::clone(&page_tline),
});
Ok(inmem_timeline)
Ok(page_tline)
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: ZTenantId,
pub state: Option<TenantState>,
pub has_in_progress_downloads: Option<bool>,
}
///
/// Get list of tenants, for the mgmt API
///
pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
tenants_state::read_tenants()
.iter()
@@ -520,7 +532,6 @@ pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
TenantInfo {
id: *id,
state: Some(tenant.state),
current_physical_size: None,
has_in_progress_downloads,
}
})

View File

@@ -120,10 +120,6 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.enable_all()
.on_thread_start(|| {
thread_mgr::register(ThreadKind::TenantTaskWorker, "tenant-task-worker")
})
.on_thread_stop(thread_mgr::deregister)
.build()?;
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);

View File

@@ -97,9 +97,6 @@ pub enum ThreadKind {
// Thread that schedules new compaction and gc jobs
TenantTaskManager,
// Worker thread for tenant tasks thread pool
TenantTaskWorker,
// Thread that flushes frozen in-memory layers to disk
LayerFlushThread,
@@ -108,20 +105,18 @@ pub enum ThreadKind {
StorageSync,
}
#[derive(Default)]
struct MutableThreadState {
/// Tenant and timeline that this thread is associated with.
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
/// Handle for waiting for the thread to exit. It can be None, if the
/// the thread has already exited. OR if this thread is managed externally
/// and was not spawned through thread_mgr.rs::spawn function.
/// the thread has already exited.
join_handle: Option<JoinHandle<()>>,
}
struct PageServerThread {
thread_id: u64,
_thread_id: u64,
kind: ThreadKind,
@@ -152,7 +147,7 @@ where
let (shutdown_tx, shutdown_rx) = watch::channel(());
let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
let thread = Arc::new(PageServerThread {
thread_id,
_thread_id: thread_id,
kind,
name: name.to_string(),
shutdown_requested: AtomicBool::new(false),
@@ -320,10 +315,8 @@ pub fn shutdown_threads(
drop(thread_mut);
let _ = join_handle.join();
} else {
// Possibly one of:
// * The thread had not even fully started yet.
// * It was shut down concurrently and already exited
// * Is managed through `register`/`deregister` fns without providing a join handle
// The thread had not even fully started yet. Or it was shut down
// concurrently and already exited
}
}
}
@@ -355,56 +348,3 @@ pub fn is_shutdown_requested() -> bool {
}
})
}
/// Needed to register threads that were not spawned through spawn function.
/// For example tokio blocking threads. This function is expected to be used
/// in tandem with `deregister`.
/// NOTE: threads registered through this function cannot be joined
pub fn register(kind: ThreadKind, name: &str) {
CURRENT_THREAD.with(|ct| {
let mut borrowed = ct.borrow_mut();
if borrowed.is_some() {
panic!("thread already registered")
};
let (shutdown_tx, shutdown_rx) = watch::channel(());
let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
let thread = Arc::new(PageServerThread {
thread_id,
kind,
name: name.to_owned(),
shutdown_requested: AtomicBool::new(false),
shutdown_tx,
mutable: Mutex::new(MutableThreadState {
tenant_id: None,
timeline_id: None,
join_handle: None,
}),
});
*borrowed = Some(Arc::clone(&thread));
SHUTDOWN_RX.with(|rx| {
*rx.borrow_mut() = Some(shutdown_rx);
});
THREADS.lock().unwrap().insert(thread_id, thread);
});
}
// Expected to be used in tandem with `register`. See the doc for `register` for more details
pub fn deregister() {
CURRENT_THREAD.with(|ct| {
let mut borrowed = ct.borrow_mut();
let thread = match borrowed.take() {
Some(thread) => thread,
None => panic!("calling deregister on unregistered thread"),
};
SHUTDOWN_RX.with(|rx| {
*rx.borrow_mut() = None;
});
THREADS.lock().unwrap().remove(&thread.thread_id)
});
}

View File

@@ -4,6 +4,8 @@
use anyhow::{bail, ensure, Context, Result};
use postgres_ffi::ControlFileData;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::{
fs,
path::Path,
@@ -18,15 +20,123 @@ use utils::{
zid::{ZTenantId, ZTimelineId},
};
use crate::tenant_mgr;
use crate::{
config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt, RepositoryImpl, TimelineImpl,
config::PageServerConf,
layered_repository::metadata::TimelineMetadata,
repository::{LocalTimelineState, Repository},
storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt,
DatadirTimeline, RepositoryImpl,
};
use crate::{import_datadir, LOG_FILE_NAME};
use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager};
use crate::{repository::RepositoryTimeline, tenant_mgr};
use crate::{repository::Timeline, CheckpointConfig};
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocalTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<ZTimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<usize>,
pub timeline_state: LocalTimelineState,
}
impl LocalTimelineInfo {
pub fn from_loaded_timeline<R: Repository>(
datadir_tline: &DatadirTimeline<R>,
include_non_incremental_logical_size: bool,
) -> anyhow::Result<Self> {
let last_record_lsn = datadir_tline.tline.get_last_record_lsn();
let info = LocalTimelineInfo {
ancestor_timeline_id: datadir_tline.tline.get_ancestor_timeline_id(),
ancestor_lsn: {
match datadir_tline.tline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: datadir_tline.tline.get_disk_consistent_lsn(),
last_record_lsn,
prev_record_lsn: Some(datadir_tline.tline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *datadir_tline.tline.get_latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Loaded,
current_logical_size: Some(datadir_tline.get_current_logical_size()),
current_logical_size_non_incremental: if include_non_incremental_logical_size {
Some(datadir_tline.get_current_logical_size_non_incremental(last_record_lsn)?)
} else {
None
},
};
Ok(info)
}
pub fn from_unloaded_timeline(metadata: &TimelineMetadata) -> Self {
LocalTimelineInfo {
ancestor_timeline_id: metadata.ancestor_timeline(),
ancestor_lsn: {
match metadata.ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: metadata.disk_consistent_lsn(),
last_record_lsn: metadata.disk_consistent_lsn(),
prev_record_lsn: metadata.prev_record_lsn(),
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Unloaded,
current_logical_size: None,
current_logical_size_non_incremental: None,
}
}
pub fn from_repo_timeline<T>(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
repo_timeline: &RepositoryTimeline<T>,
include_non_incremental_logical_size: bool,
) -> anyhow::Result<Self> {
match repo_timeline {
RepositoryTimeline::Loaded(_) => {
let datadir_tline =
tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?;
Self::from_loaded_timeline(&datadir_tline, include_non_incremental_logical_size)
}
RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)),
}
}
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RemoteTimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub awaits_download: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: ZTenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: ZTimelineId,
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timeline_id: ZTimelineId,
@@ -188,18 +298,19 @@ fn bootstrap_timeline<R: Repository>(
// Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = repo.create_empty_timeline(tli, lsn)?;
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
let mut page_tline: DatadirTimeline<R> = DatadirTimeline::new(timeline, u64::MAX);
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &mut page_tline, lsn)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
});
timeline.checkpoint(CheckpointConfig::Forced)?;
page_tline.tline.checkpoint(CheckpointConfig::Forced)?;
info!(
"created root timeline {} timeline.lsn {}",
tli,
timeline.get_last_record_lsn()
page_tline.tline.get_last_record_lsn()
);
// Remove temp dir. We don't need it anymore
@@ -208,22 +319,36 @@ fn bootstrap_timeline<R: Repository>(
Ok(())
}
///
/// Create a new timeline.
///
/// Returns the new timeline ID and reference to its Timeline object.
///
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
/// the same timeline ID already exists, returns None. If `new_timeline_id` is not given,
/// a new unique ID is generated.
///
pub(crate) fn get_local_timelines(
tenant_id: ZTenantId,
include_non_incremental_logical_size: bool,
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
let repo_timelines = repo.list_timelines();
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
for (timeline_id, repository_timeline) in repo_timelines {
local_timeline_info.push((
timeline_id,
LocalTimelineInfo::from_repo_timeline(
tenant_id,
timeline_id,
&repository_timeline,
include_non_incremental_logical_size,
)?,
))
}
Ok(local_timeline_info)
}
pub(crate) fn create_timeline(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
new_timeline_id: Option<ZTimelineId>,
ancestor_timeline_id: Option<ZTimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
) -> Result<Option<(ZTimelineId, Arc<TimelineImpl>)>> {
) -> Result<Option<TimelineInfo>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate);
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
@@ -232,7 +357,7 @@ pub(crate) fn create_timeline(
return Ok(None);
}
let _new_timeline = match ancestor_timeline_id {
let new_timeline_info = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = repo
.get_timeline_load(ancestor_timeline_id)
@@ -260,13 +385,26 @@ pub(crate) fn create_timeline(
}
}
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?;
// load the timeline into memory
let loaded_timeline =
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
LocalTimelineInfo::from_loaded_timeline(&loaded_timeline, false)
.context("cannot fill timeline info")?
}
None => {
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?;
// load the timeline into memory
let new_timeline =
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
LocalTimelineInfo::from_loaded_timeline(&new_timeline, false)
.context("cannot fill timeline info")?
}
None => bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?,
};
// load the timeline into memory
let loaded_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
Ok(Some((new_timeline_id, loaded_timeline)))
Ok(Some(TimelineInfo {
tenant_id,
timeline_id: new_timeline_id,
local: Some(new_timeline_info),
remote: None,
}))
}

View File

@@ -34,6 +34,7 @@ use std::collections::HashMap;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::walrecord::*;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::xlog_utils::*;
@@ -43,8 +44,8 @@ use utils::lsn::Lsn;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
pub struct WalIngest<'a, T: DatadirTimeline> {
timeline: &'a T,
pub struct WalIngest<'a, R: Repository> {
timeline: &'a DatadirTimeline<R>,
checkpoint: CheckPoint,
checkpoint_modified: bool,
@@ -52,8 +53,8 @@ pub struct WalIngest<'a, T: DatadirTimeline> {
relsize_cache: HashMap<RelTag, BlockNumber>,
}
impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
pub fn new(timeline: &T, startpoint: Lsn) -> Result<WalIngest<T>> {
impl<'a, R: Repository> WalIngest<'a, R> {
pub fn new(timeline: &DatadirTimeline<R>, startpoint: Lsn) -> Result<WalIngest<R>> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint)?;
@@ -77,13 +78,13 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
///
pub fn ingest_record(
&mut self,
timeline: &DatadirTimeline<R>,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<T>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
let mut modification = timeline.begin_modification(lsn);
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -97,7 +98,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
self.ingest_heapam_record(&mut buf, modification, decoded)?;
self.ingest_heapam_record(&mut buf, &mut modification, &mut decoded)?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
@@ -105,19 +106,19 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
== pg_constants::XLOG_SMGR_CREATE
{
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(modification, &create)?;
self.ingest_xlog_smgr_create(&mut modification, &create)?;
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
self.ingest_xlog_smgr_truncate(&mut modification, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb)?;
self.ingest_xlog_dbase_create(&mut modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
@@ -136,7 +137,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
&mut modification,
SlruKind::Clog,
segno,
rpageno,
@@ -145,7 +146,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(modification, &xlrec)?;
self.ingest_clog_truncate_record(&mut modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
@@ -153,7 +154,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&mut modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
)?;
@@ -163,7 +164,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&mut modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
)?;
@@ -186,7 +187,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
&mut modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
@@ -197,7 +198,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
&mut modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
@@ -205,14 +206,14 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
)?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(modification, &xlrec)?;
self.ingest_multixact_create_record(&mut modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec)?;
self.ingest_multixact_truncate_record(&mut modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded)?;
self.ingest_relmap_page(&mut modification, &xlrec, &decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -247,7 +248,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
self.ingest_decoded_block(modification, lsn, decoded, blk)?;
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
}
// If checkpoint data was updated, store the new version in the repository
@@ -260,14 +261,14 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit(lsn)?;
modification.commit()?;
Ok(())
}
fn ingest_decoded_block(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
lsn: Lsn,
decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock,
@@ -327,7 +328,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_heapam_record(
&mut self,
buf: &mut Bytes,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -471,7 +472,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
/// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
fn ingest_xlog_dbase_create(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rec: &XlCreateDatabase,
) -> Result<()> {
let db_id = rec.db_id;
@@ -538,7 +539,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_xlog_smgr_create(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rec: &XlSmgrCreate,
) -> Result<()> {
let rel = RelTag {
@@ -556,7 +557,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
/// This is the same logic as in PostgreSQL's smgr_redo() function.
fn ingest_xlog_smgr_truncate(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rec: &XlSmgrTruncate,
) -> Result<()> {
let spcnode = rec.rnode.spcnode;
@@ -621,7 +622,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
///
fn ingest_xact_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
parsed: &XlXactParsedRecord,
is_commit: bool,
) -> Result<()> {
@@ -690,7 +691,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_clog_truncate_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
xlrec: &XlClogTruncate,
) -> Result<()> {
info!(
@@ -748,7 +749,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_multixact_create_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
xlrec: &XlMultiXactCreate,
) -> Result<()> {
// Create WAL record for updating the multixact-offsets page
@@ -827,7 +828,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_multixact_truncate_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
xlrec: &XlMultiXactTruncate,
) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
@@ -861,7 +862,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_relmap_page(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord,
) -> Result<()> {
@@ -877,7 +878,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_creation(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rel: RelTag,
) -> Result<()> {
self.relsize_cache.insert(rel, 0);
@@ -887,7 +888,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_page_image(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
@@ -899,7 +900,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_wal_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rel: RelTag,
blknum: BlockNumber,
rec: ZenithWalRecord,
@@ -911,7 +912,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_truncation(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rel: RelTag,
nblocks: BlockNumber,
) -> Result<()> {
@@ -922,7 +923,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_drop(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rel: RelTag,
) -> Result<()> {
modification.put_rel_drop(rel)?;
@@ -947,7 +948,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn handle_rel_extend(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
rel: RelTag,
blknum: BlockNumber,
) -> Result<()> {
@@ -985,7 +986,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_slru_page_image(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
@@ -998,7 +999,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn handle_slru_extend(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification<R>,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
@@ -1051,7 +1052,6 @@ mod tests {
use super::*;
use crate::pgdatadir_mapping::create_test_timeline;
use crate::repository::repo_harness::*;
use crate::repository::Timeline;
use postgres_ffi::pg_constants;
/// Arbitrary relation tag, for testing.
@@ -1062,17 +1062,17 @@ mod tests {
forknum: 0,
};
fn assert_current_logical_size<T: Timeline>(_timeline: &T, _lsn: Lsn) {
fn assert_current_logical_size<R: Repository>(_timeline: &DatadirTimeline<R>, _lsn: Lsn) {
// TODO
}
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<T: DatadirTimeline>(tline: &T) -> Result<WalIngest<T>> {
let mut m = tline.begin_modification();
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
m.commit(Lsn(0x10))?;
m.commit()?;
let walingest = WalIngest::new(tline, Lsn(0x10))?;
Ok(walingest)
@@ -1082,23 +1082,23 @@ mod tests {
fn test_relsize() -> Result<()> {
let repo = RepoHarness::create("test_relsize")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x20));
walingest.put_rel_creation(&mut m, TESTREL_A)?;
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit(Lsn(0x20))?;
let mut m = tline.begin_modification();
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?;
m.commit(Lsn(0x30))?;
let mut m = tline.begin_modification();
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?;
m.commit(Lsn(0x40))?;
let mut m = tline.begin_modification();
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
m.commit(Lsn(0x50))?;
m.commit()?;
assert_current_logical_size(&*tline, Lsn(0x50));
assert_current_logical_size(&tline, Lsn(0x50));
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
@@ -1142,10 +1142,10 @@ mod tests {
);
// Truncate last block
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x60));
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
m.commit(Lsn(0x60))?;
assert_current_logical_size(&*tline, Lsn(0x60));
m.commit()?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 2);
@@ -1166,15 +1166,15 @@ mod tests {
);
// Truncate to zero length
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x68));
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
m.commit(Lsn(0x68))?;
m.commit()?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
// Extend from 0 to 2 blocks, leaving a gap
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x70));
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
m.commit(Lsn(0x70))?;
m.commit()?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
assert_eq!(
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
@@ -1186,9 +1186,9 @@ mod tests {
);
// Extend a lot more, leaving a big gap that spans across segments
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x80));
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
m.commit(Lsn(0x80))?;
m.commit()?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
for blk in 2..1500 {
assert_eq!(
@@ -1210,20 +1210,20 @@ mod tests {
fn test_drop_extend() -> Result<()> {
let repo = RepoHarness::create("test_drop_extend")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x20));
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit(Lsn(0x20))?;
m.commit()?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
// Drop rel
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A)?;
m.commit(Lsn(0x30))?;
m.commit()?;
// Check that rel is not visible anymore
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
@@ -1232,9 +1232,9 @@ mod tests {
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
// Re-create it
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x40));
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?;
m.commit(Lsn(0x40))?;
m.commit()?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
@@ -1250,16 +1250,16 @@ mod tests {
fn test_truncate_extend() -> Result<()> {
let repo = RepoHarness::create("test_truncate_extend")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut walingest = init_walingest_test(&tline)?;
// Create a 20 MB relation (the size is arbitrary)
let relsize = 20 * 1024 * 1024 / 8192;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x20));
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit(Lsn(0x20))?;
m.commit()?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
@@ -1280,9 +1280,9 @@ mod tests {
// Truncate relation so that second segment was dropped
// - only leave one page
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(0x60));
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
m.commit(Lsn(0x60))?;
m.commit()?;
// Check reported size and contents after truncation
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
@@ -1310,12 +1310,12 @@ mod tests {
// Extend relation again.
// Add enough blocks to create second segment
let lsn = Lsn(0x80);
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(lsn);
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, lsn);
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit(lsn)?;
m.commit()?;
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
@@ -1338,18 +1338,18 @@ mod tests {
fn test_large_rel() -> Result<()> {
let repo = RepoHarness::create("test_large_rel")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut walingest = init_walingest_test(&tline)?;
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?;
m.commit(Lsn(lsn))?;
m.commit()?;
}
assert_current_logical_size(&*tline, Lsn(lsn));
assert_current_logical_size(&tline, Lsn(lsn));
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
@@ -1358,34 +1358,34 @@ mod tests {
// Truncate one block
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
m.commit(Lsn(lsn))?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
);
assert_current_logical_size(&*tline, Lsn(lsn));
assert_current_logical_size(&tline, Lsn(lsn));
// Truncate another block
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
m.commit(Lsn(lsn))?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
);
assert_current_logical_size(&*tline, Lsn(lsn));
assert_current_logical_size(&tline, Lsn(lsn));
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
// This tests the behavior at segment boundaries
let mut size: i32 = 3000;
while size >= 0 {
lsn += 0x10;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
m.commit(Lsn(lsn))?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
size as BlockNumber
@@ -1393,7 +1393,7 @@ mod tests {
size -= 1;
}
assert_current_logical_size(&*tline, Lsn(lsn));
assert_current_logical_size(&tline, Lsn(lsn));
Ok(())
}

View File

@@ -26,6 +26,7 @@ mod walreceiver_connection;
use anyhow::{ensure, Context};
use etcd_broker::Client;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::cell::Cell;
use std::collections::{hash_map, HashMap, HashSet};
use std::future::Future;
@@ -35,13 +36,14 @@ use std::thread_local;
use std::time::Duration;
use tokio::{
select,
sync::{mpsc, watch},
sync::{mpsc, watch, RwLock},
task::JoinHandle,
};
use tracing::*;
use url::Url;
use crate::config::PageServerConf;
use crate::http::models::WalReceiverEntry;
use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState};
use crate::thread_mgr::{self, ThreadKind};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -53,6 +55,23 @@ thread_local! {
pub(crate) static IS_WAL_RECEIVER: Cell<bool> = Cell::new(false);
}
/// WAL receiver state for sharing with the outside world.
/// Only entries for timelines currently available in pageserver are stored.
static WAL_RECEIVER_ENTRIES: Lazy<RwLock<HashMap<ZTenantTimelineId, WalReceiverEntry>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
/// Gets the public WAL streaming entry for a certain timeline.
pub async fn get_wal_receiver_entry(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> Option<WalReceiverEntry> {
WAL_RECEIVER_ENTRIES
.read()
.await
.get(&ZTenantTimelineId::new(tenant_id, timeline_id))
.cloned()
}
/// Sets up the main WAL receiver thread that manages the rest of the subtasks inside of it, per timeline.
/// See comments in [`wal_receiver_main_thread_loop_step`] for more details on per timeline activities.
pub fn init_wal_receiver_main_thread(
@@ -262,10 +281,13 @@ async fn wal_receiver_main_thread_loop_step<'a>(
}
None => warn!("Timeline {id} does not have a tenant entry in wal receiver main thread"),
};
if let Err(e) = join_confirmation_sender.send(()) {
warn!("cannot send wal_receiver shutdown confirmation {e}")
} else {
info!("confirm walreceiver shutdown for {id}");
{
WAL_RECEIVER_ENTRIES.write().await.remove(&id);
if let Err(e) = join_confirmation_sender.send(()) {
warn!("cannot send wal_receiver shutdown confirmation {e}")
} else {
info!("confirm walreceiver shutdown for {id}");
}
}
}
// Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly.
@@ -300,6 +322,17 @@ async fn wal_receiver_main_thread_loop_step<'a>(
}
};
{
WAL_RECEIVER_ENTRIES.write().await.insert(
id,
WalReceiverEntry {
wal_producer_connstr: None,
last_received_msg_lsn: None,
last_received_msg_ts: None,
},
);
}
vacant_connection_manager_entry.insert(
connection_manager::spawn_connection_manager_task(
id,

View File

@@ -25,8 +25,7 @@ use etcd_broker::{
use tokio::select;
use tracing::*;
use crate::repository::{Repository, Timeline};
use crate::{RepositoryImpl, TimelineImpl};
use crate::DatadirTimelineImpl;
use utils::{
lsn::Lsn,
pq_proto::ReplicationFeedback,
@@ -40,7 +39,7 @@ pub(super) fn spawn_connection_manager_task(
id: ZTenantTimelineId,
broker_loop_prefix: String,
mut client: Client,
local_timeline: Arc<TimelineImpl>,
local_timeline: Arc<DatadirTimelineImpl>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
@@ -168,7 +167,7 @@ async fn connection_manager_loop_step(
walreceiver_state
.change_connection(
new_candidate.safekeeper_id,
new_candidate.wal_source_connstr,
new_candidate.wal_producer_connstr,
)
.await
}
@@ -230,8 +229,8 @@ async fn subscribe_for_timeline_updates(
}
}
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 2.0;
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 60.0;
async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) {
if n == 0 {
@@ -246,7 +245,7 @@ async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) {
struct WalreceiverState {
id: ZTenantTimelineId,
/// Use pageserver data about the timeline to filter out some of the safekeepers.
local_timeline: Arc<TimelineImpl>,
local_timeline: Arc<DatadirTimelineImpl>,
/// The timeout on the connection to safekeeper for WAL streaming.
wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
@@ -284,7 +283,7 @@ struct EtcdSkTimeline {
impl WalreceiverState {
fn new(
id: ZTenantTimelineId,
local_timeline: Arc<<RepositoryImpl as Repository>::Timeline>,
local_timeline: Arc<DatadirTimelineImpl>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
@@ -302,7 +301,7 @@ impl WalreceiverState {
}
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) {
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_producer_connstr: String) {
if let Some(old_connection) = self.wal_connection.take() {
old_connection.connection_task.shutdown().await
}
@@ -324,7 +323,7 @@ impl WalreceiverState {
.await;
super::walreceiver_connection::handle_walreceiver_connection(
id,
&new_wal_source_connstr,
&new_wal_producer_connstr,
events_sender.as_ref(),
cancellation,
connect_timeout,
@@ -387,7 +386,7 @@ impl WalreceiverState {
Some(existing_wal_connection) => {
let connected_sk_node = existing_wal_connection.sk_id;
let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connstr) =
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) =
self.select_connection_candidate(Some(connected_sk_node))?;
let now = Utc::now().naive_utc();
@@ -397,7 +396,7 @@ impl WalreceiverState {
if latest_interaciton > self.lagging_wal_timeout {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
wal_producer_connstr: new_wal_producer_connstr,
reason: ReconnectReason::NoWalTimeout {
last_wal_interaction: Some(
existing_wal_connection.latest_connection_update,
@@ -423,7 +422,7 @@ impl WalreceiverState {
return Some(
NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
wal_producer_connstr: new_wal_producer_connstr,
reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag },
});
}
@@ -434,18 +433,18 @@ impl WalreceiverState {
None => {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
wal_producer_connstr: new_wal_producer_connstr,
reason: ReconnectReason::NoEtcdDataForExistingConnection,
})
}
}
}
None => {
let (new_sk_id, _, new_wal_source_connstr) =
let (new_sk_id, _, new_wal_producer_connstr) =
self.select_connection_candidate(None)?;
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
wal_producer_connstr: new_wal_producer_connstr,
reason: ReconnectReason::NoExistingConnection,
});
}
@@ -546,7 +545,7 @@ impl WalreceiverState {
#[derive(Debug, PartialEq, Eq)]
struct NewWalConnectionCandidate {
safekeeper_id: NodeId,
wal_source_connstr: String,
wal_producer_connstr: String,
reason: ReconnectReason,
}
@@ -803,7 +802,7 @@ mod tests {
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
);
assert!(only_candidate
.wal_source_connstr
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
let selected_lsn = 100_000;
@@ -868,7 +867,7 @@ mod tests {
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
);
assert!(biggest_wal_candidate
.wal_source_connstr
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
@@ -985,7 +984,7 @@ mod tests {
"Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper"
);
assert!(only_candidate
.wal_source_connstr
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
@@ -1067,7 +1066,7 @@ mod tests {
"Should select bigger WAL safekeeper if it starts to lag enough"
);
assert!(over_threshcurrent_candidate
.wal_source_connstr
.wal_producer_connstr
.contains("advanced by Lsn safekeeper"));
Ok(())
@@ -1134,7 +1133,7 @@ mod tests {
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
assert!(over_threshcurrent_candidate
.wal_source_connstr
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
@@ -1190,7 +1189,7 @@ mod tests {
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
assert!(over_threshcurrent_candidate
.wal_source_connstr
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
@@ -1204,10 +1203,13 @@ mod tests {
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
},
local_timeline: harness
.load()
.create_empty_timeline(TIMELINE_ID, Lsn(0))
.expect("Failed to create an empty timeline for dummy wal connection manager"),
local_timeline: Arc::new(DatadirTimelineImpl::new(
harness
.load()
.create_empty_timeline(TIMELINE_ID, Lsn(0))
.expect("Failed to create an empty timeline for dummy wal connection manager"),
10_000,
)),
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1).unwrap(),

View File

@@ -9,38 +9,36 @@ use std::{
use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use fail::fail_point;
use futures::StreamExt;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use super::TaskEvent;
use crate::{
layered_repository::WalReceiverInfo,
pgdatadir_mapping::DatadirTimeline,
http::models::WalReceiverEntry,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming.
pub async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_source_connstr: &str,
wal_producer_connstr: &str,
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
// Connect to the database in replication mode.
info!("connecting to {wal_source_connstr}");
let connect_cfg = format!("{wal_source_connstr} application_name=pageserver replication=true");
info!("connecting to {wal_producer_connstr}");
let connect_cfg =
format!("{wal_producer_connstr} application_name=pageserver replication=true");
let (mut replication_client, connection) = time::timeout(
connect_timeout,
@@ -152,25 +150,19 @@ pub async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.context("could not ingest record at {lsn}")?;
walingest.ingest_record(&timeline, recdata, lsn)?;
fail_point!("walreceiver-after-ingest");
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
}
last_rec_lsn = lsn;
}
if !caught_up && endlsn >= end_of_wal {
@@ -178,7 +170,7 @@ pub async fn handle_walreceiver_connection(
caught_up = true;
}
let timeline_to_check = Arc::clone(&timeline);
let timeline_to_check = Arc::clone(&timeline.tline);
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
.await
.with_context(|| {
@@ -226,22 +218,27 @@ pub async fn handle_walreceiver_connection(
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = u64::from(last_lsn);
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let flush_lsn = u64::from(timeline.get_disk_consistent_lsn());
let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn());
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
let ts = SystemTime::now();
// Update the status about what we just received. This is shown in the mgmt API.
let last_received_wal = WalReceiverInfo {
wal_source_connstr: wal_source_connstr.to_owned(),
last_received_msg_lsn: last_lsn,
last_received_msg_ts: ts
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
};
*timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
{
super::WAL_RECEIVER_ENTRIES.write().await.insert(
id,
WalReceiverEntry {
wal_producer_connstr: Some(wal_producer_connstr.to_owned()),
last_received_msg_lsn: Some(last_lsn),
last_received_msg_ts: Some(
ts.duration_since(SystemTime::UNIX_EPOCH)
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
),
},
);
}
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.

View File

@@ -96,7 +96,6 @@ impl DecodedBkpBlock {
}
}
#[derive(Default)]
pub struct DecodedWALRecord {
pub xl_xid: TransactionId,
pub xl_info: u8,
@@ -506,17 +505,7 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
//
//
// For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
// It would be more natural for this function to return a DecodedWALRecord as return value,
// but reusing the caller-supplied struct avoids an allocation.
// This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
//
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
) -> Result<(), DeserializeError> {
pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeError> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -545,7 +534,7 @@ pub fn decode_wal_record(
let mut blocks_total_len: u32 = 0;
let mut main_data_len = 0;
let mut datatotal: u32 = 0;
decoded.blocks.clear();
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
// 2. Decode the headers.
// XLogRecordBlockHeaders if any,
@@ -724,7 +713,7 @@ pub fn decode_wal_record(
blk.blkno
);
decoded.blocks.push(blk);
blocks.push(blk);
}
_ => {
@@ -735,7 +724,7 @@ pub fn decode_wal_record(
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in decoded.blocks.iter_mut() {
for blk in blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
@@ -755,13 +744,14 @@ pub fn decode_wal_record(
assert_eq!(buf.remaining(), main_data_len as usize);
}
decoded.xl_xid = xlogrec.xl_xid;
decoded.xl_info = xlogrec.xl_info;
decoded.xl_rmid = xlogrec.xl_rmid;
decoded.record = record;
decoded.main_data_offset = main_data_offset;
Ok(())
Ok(DecodedWALRecord {
xl_xid: xlogrec.xl_xid,
xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid,
record,
blocks,
main_data_offset,
})
}
///

View File

@@ -1,14 +1,11 @@
//! Client authentication mechanisms.
pub mod backend;
pub use backend::{BackendType, DatabaseInfo};
pub use backend::DatabaseInfo;
mod credentials;
pub use credentials::ClientCredentials;
mod password_hack;
use password_hack::PasswordHackPayload;
mod flow;
pub use flow::*;
@@ -32,8 +29,9 @@ pub enum AuthErrorImpl {
#[error(transparent)]
Sasl(#[from] crate::sasl::Error),
#[error("Malformed password message: {0}")]
MalformedPassword(&'static str),
/// For passwords that couldn't be processed by [`backend::legacy_console::parse_password`].
#[error("Malformed password message")]
MalformedPassword,
/// Errors produced by [`crate::stream::PqStream`].
#[error(transparent)]
@@ -78,7 +76,7 @@ impl UserFacingError for AuthError {
Console(e) => e.to_string_client(),
GetAuthInfo(e) => e.to_string_client(),
Sasl(e) => e.to_string_client(),
MalformedPassword(_) => self.to_string(),
MalformedPassword => self.to_string(),
_ => "Internal error".to_string(),
}
}

View File

@@ -1,14 +1,16 @@
mod legacy_console;
mod link;
mod postgres;
pub mod console;
mod legacy_console;
pub use legacy_console::{AuthError, AuthErrorImpl};
use super::ClientCredentials;
use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute, config, mgmt,
compute,
config::{AuthBackendType, ProxyConfig},
mgmt,
stream::PqStream,
waiters::{self, Waiter, Waiters},
};
@@ -76,158 +78,32 @@ impl From<DatabaseInfo> for tokio_postgres::Config {
}
}
/// This type serves two purposes:
///
/// * When `T` is `()`, it's just a regular auth backend selector
/// which we use in [`crate::config::ProxyConfig`].
///
/// * However, when we substitute `T` with [`ClientCredentials`],
/// this helps us provide the credentials only to those auth
/// backends which require them for the authentication process.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum BackendType<T> {
/// Legacy Cloud API (V1) + link auth.
LegacyConsole(T),
/// Current Cloud API (V2).
Console(T),
/// Local mock of Cloud API (V2).
Postgres(T),
/// Authentication via a web browser.
Link,
}
impl<T> BackendType<T> {
/// Very similar to [`std::option::Option::map`].
/// Maps [`BackendType<T>`] to [`BackendType<R>`] by applying
/// a function to a contained value.
pub fn map<R>(self, f: impl FnOnce(T) -> R) -> BackendType<R> {
use BackendType::*;
match self {
LegacyConsole(x) => LegacyConsole(f(x)),
Console(x) => Console(f(x)),
Postgres(x) => Postgres(f(x)),
Link => Link,
pub(super) async fn handle_user(
config: &ProxyConfig,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
creds: ClientCredentials,
) -> super::Result<compute::NodeInfo> {
use AuthBackendType::*;
match config.auth_backend {
LegacyConsole => {
legacy_console::handle_user(
&config.auth_endpoint,
&config.auth_link_uri,
client,
&creds,
)
.await
}
}
}
impl<T, E> BackendType<Result<T, E>> {
/// Very similar to [`std::option::Option::transpose`].
/// This is most useful for error handling.
pub fn transpose(self) -> Result<BackendType<T>, E> {
use BackendType::*;
match self {
LegacyConsole(x) => x.map(LegacyConsole),
Console(x) => x.map(Console),
Postgres(x) => x.map(Postgres),
Link => Ok(Link),
}
}
}
impl BackendType<ClientCredentials> {
/// Authenticate the client via the requested backend, possibly using credentials.
pub async fn authenticate(
mut self,
urls: &config::AuthUrls,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> super::Result<compute::NodeInfo> {
use BackendType::*;
if let Console(creds) | Postgres(creds) = &mut self {
// If there's no project so far, that entails that client doesn't
// support SNI or other means of passing the project name.
// We now expect to see a very specific payload in the place of password.
if creds.project().is_none() {
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)
.await?
.authenticate()
.await?;
// Finally we may finish the initialization of `creds`.
// TODO: add missing type safety to ClientCredentials.
creds.project = Some(payload.project);
let mut config = match &self {
Console(creds) => {
console::Api::new(&urls.auth_endpoint, creds)
.wake_compute()
.await?
}
Postgres(creds) => {
postgres::Api::new(&urls.auth_endpoint, creds)
.wake_compute()
.await?
}
_ => unreachable!("see the patterns above"),
};
// We should use a password from payload as well.
config.password(payload.password);
return Ok(compute::NodeInfo {
reported_auth_ok: false,
config,
});
}
}
match self {
LegacyConsole(creds) => {
legacy_console::handle_user(
&urls.auth_endpoint,
&urls.auth_link_uri,
&creds,
client,
)
Console => {
console::Api::new(&config.auth_endpoint, &creds)?
.handle_user(client)
.await
}
Console(creds) => {
console::Api::new(&urls.auth_endpoint, &creds)
.handle_user(client)
.await
}
Postgres(creds) => {
postgres::Api::new(&urls.auth_endpoint, &creds)
.handle_user(client)
.await
}
// NOTE: this auth backend doesn't use client credentials.
Link => link::handle_user(&urls.auth_link_uri, client).await,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backend_type_map() {
let values = [
BackendType::LegacyConsole(0),
BackendType::Console(0),
BackendType::Postgres(0),
BackendType::Link,
];
for value in values {
assert_eq!(value.map(|x| x), value);
}
}
#[test]
fn test_backend_type_transpose() {
let values = [
BackendType::LegacyConsole(Ok::<_, ()>(0)),
BackendType::Console(Ok(0)),
BackendType::Postgres(Ok(0)),
BackendType::Link,
];
for value in values {
assert_eq!(value.map(Result::unwrap), value.transpose().unwrap());
}
Postgres => {
postgres::Api::new(&config.auth_endpoint, &creds)?
.handle_user(client)
.await
}
Link => link::handle_user(&config.auth_link_uri, client).await,
}
}

View File

@@ -1,17 +1,18 @@
//! Cloud API V2.
use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute::{self, ComputeConnCfg},
error::{io_error, UserFacingError},
auth::{self, AuthFlow, ClientCredentials, DatabaseInfo},
compute,
error::UserFacingError,
scram,
stream::PqStream,
url::ApiUrl,
};
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::{future::Future, io};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
pub type Result<T> = std::result::Result<T, ConsoleAuthError>;
@@ -83,8 +84,8 @@ pub(super) struct Api<'a> {
impl<'a> Api<'a> {
/// Construct an API object containing the auth parameters.
pub(super) fn new(endpoint: &'a ApiUrl, creds: &'a ClientCredentials) -> Self {
Self { endpoint, creds }
pub(super) fn new(endpoint: &'a ApiUrl, creds: &'a ClientCredentials) -> Result<Self> {
Ok(Self { endpoint, creds })
}
/// Authenticate the existing user or throw an error.
@@ -99,7 +100,7 @@ impl<'a> Api<'a> {
let mut url = self.endpoint.clone();
url.path_segments_mut().push("proxy_get_role_secret");
url.query_pairs_mut()
.append_pair("project", self.creds.project().expect("impossible"))
.append_pair("project", self.creds.project_name.as_ref()?)
.append_pair("role", &self.creds.user);
// TODO: use a proper logger
@@ -119,11 +120,11 @@ impl<'a> Api<'a> {
}
/// Wake up the compute node and return the corresponding connection info.
pub(super) async fn wake_compute(&self) -> Result<ComputeConnCfg> {
async fn wake_compute(&self) -> Result<DatabaseInfo> {
let mut url = self.endpoint.clone();
url.path_segments_mut().push("proxy_wake_compute");
url.query_pairs_mut()
.append_pair("project", self.creds.project().expect("impossible"));
let project_name = self.creds.project_name.as_ref()?;
url.query_pairs_mut().append_pair("project", project_name);
// TODO: use a proper logger
println!("cplane request: {url}");
@@ -136,20 +137,16 @@ impl<'a> Api<'a> {
let response: GetWakeComputeResponse =
serde_json::from_str(&resp.text().await.map_err(io_error)?)?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&response.address) {
None => return Err(ConsoleAuthError::BadComputeAddress(response.address)),
Some(x) => x,
};
let (host, port) = parse_host_port(&response.address)
.ok_or(ConsoleAuthError::BadComputeAddress(response.address))?;
let mut config = ComputeConnCfg::new();
config
.host(host)
.port(port)
.dbname(&self.creds.dbname)
.user(&self.creds.user);
Ok(config)
Ok(DatabaseInfo {
host,
port,
dbname: self.creds.dbname.to_owned(),
user: self.creds.user.to_owned(),
password: None,
})
}
}
@@ -163,7 +160,7 @@ pub(super) async fn handle_user<'a, Endpoint, GetAuthInfo, WakeCompute>(
) -> auth::Result<compute::NodeInfo>
where
GetAuthInfo: Future<Output = Result<AuthInfo>>,
WakeCompute: Future<Output = Result<ComputeConnCfg>>,
WakeCompute: Future<Output = Result<DatabaseInfo>>,
{
let auth_info = get_auth_info(endpoint).await?;
@@ -182,18 +179,48 @@ where
}
};
let mut config = wake_compute(endpoint).await?;
if let Some(keys) = scram_keys {
config.auth_keys(tokio_postgres::config::AuthKeys::ScramSha256(keys));
}
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
Ok(compute::NodeInfo {
reported_auth_ok: false,
config,
db_info: wake_compute(endpoint).await?,
scram_keys,
})
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.split_once(':')?;
Some((host, port.parse().ok()?))
/// Upcast (almost) any error into an opaque [`io::Error`].
pub(super) fn io_error(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
fn parse_host_port(input: &str) -> Option<(String, u16)> {
let (host, port) = input.split_once(':')?;
Some((host.to_owned(), port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parse_db_info() -> anyhow::Result<()> {
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
"user": "john_doe",
"password": "password",
}))?;
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
"user": "john_doe",
}))?;
Ok(())
}
}

View File

@@ -11,7 +11,7 @@ use crate::{
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::pq_proto::BeMessage as Be;
use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
#[derive(Debug, Error)]
pub enum AuthErrorImpl {
@@ -76,12 +76,6 @@ enum ProxyAuthResponse {
NotReady { ready: bool }, // TODO: get rid of `ready`
}
impl ClientCredentials {
fn is_existing_user(&self) -> bool {
self.user.ends_with("@zenith")
}
}
async fn authenticate_proxy_client(
auth_endpoint: &reqwest::Url,
creds: &ClientCredentials,
@@ -106,7 +100,7 @@ async fn authenticate_proxy_client(
}
let auth_info: ProxyAuthResponse = serde_json::from_str(resp.text().await?.as_str())?;
println!("got auth info: {:?}", auth_info);
println!("got auth info: #{:?}", auth_info);
use ProxyAuthResponse::*;
let db_info = match auth_info {
@@ -134,9 +128,7 @@ async fn handle_existing_user(
// Read client's password hash
let msg = client.read_password_message().await?;
let md5_response = parse_password(&msg).ok_or(auth::AuthErrorImpl::MalformedPassword(
"the password should be a valid null-terminated utf-8 string",
))?;
let md5_response = parse_password(&msg).ok_or(auth::AuthErrorImpl::MalformedPassword)?;
let db_info = authenticate_proxy_client(
auth_endpoint,
@@ -147,17 +139,21 @@ async fn handle_existing_user(
)
.await?;
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
Ok(compute::NodeInfo {
reported_auth_ok: false,
config: db_info.into(),
db_info,
scram_keys: None,
})
}
pub async fn handle_user(
auth_endpoint: &reqwest::Url,
auth_link_uri: &reqwest::Url,
creds: &ClientCredentials,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
creds: &ClientCredentials,
) -> auth::Result<compute::NodeInfo> {
if creds.is_existing_user() {
handle_existing_user(auth_endpoint, client, creds).await
@@ -205,24 +201,4 @@ mod tests {
.unwrap();
assert!(matches!(auth, ProxyAuthResponse::NotReady { .. }));
}
#[test]
fn parse_db_info() -> anyhow::Result<()> {
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
"user": "john_doe",
"password": "password",
}))?;
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
"user": "john_doe",
}))?;
Ok(())
}
}

View File

@@ -41,7 +41,7 @@ pub async fn handle_user(
client.write_message_noflush(&Be::NoticeResponse("Connecting to database."))?;
Ok(compute::NodeInfo {
reported_auth_ok: true,
config: db_info.into(),
db_info,
scram_keys: None,
})
}

View File

@@ -3,12 +3,10 @@
use crate::{
auth::{
self,
backend::console::{self, AuthInfo, Result},
ClientCredentials,
backend::console::{self, io_error, AuthInfo, Result},
ClientCredentials, DatabaseInfo,
},
compute::{self, ComputeConnCfg},
error::io_error,
scram,
compute, scram,
stream::PqStream,
url::ApiUrl,
};
@@ -22,8 +20,8 @@ pub(super) struct Api<'a> {
impl<'a> Api<'a> {
/// Construct an API object containing the auth parameters.
pub(super) fn new(endpoint: &'a ApiUrl, creds: &'a ClientCredentials) -> Self {
Self { endpoint, creds }
pub(super) fn new(endpoint: &'a ApiUrl, creds: &'a ClientCredentials) -> Result<Self> {
Ok(Self { endpoint, creds })
}
/// Authenticate the existing user or throw an error.
@@ -58,10 +56,7 @@ impl<'a> Api<'a> {
// We shouldn't get more than one row anyway.
[row, ..] => {
let entry = row
.try_get("rolpassword")
.map_err(|e| io_error(format!("failed to read user's password: {e}")))?;
let entry = row.try_get(0).map_err(io_error)?;
scram::ServerSecret::parse(entry)
.map(AuthInfo::Scram)
.or_else(|| {
@@ -80,14 +75,14 @@ impl<'a> Api<'a> {
}
/// We don't need to wake anything locally, so we just return the connection info.
pub(super) async fn wake_compute(&self) -> Result<ComputeConnCfg> {
let mut config = ComputeConnCfg::new();
config
.host(self.endpoint.host_str().unwrap_or("localhost"))
.port(self.endpoint.port().unwrap_or(5432))
.dbname(&self.creds.dbname)
.user(&self.creds.user);
Ok(config)
async fn wake_compute(&self) -> Result<DatabaseInfo> {
Ok(DatabaseInfo {
// TODO: handle that near CLI params parsing
host: self.endpoint.host_str().unwrap_or("localhost").to_owned(),
port: self.endpoint.port().unwrap_or(5432),
dbname: self.creds.dbname.to_owned(),
user: self.creds.user.to_owned(),
password: None,
})
}
}

View File

@@ -1,25 +1,39 @@
//! User credentials used in authentication.
use crate::compute;
use crate::config::ProxyConfig;
use crate::error::UserFacingError;
use crate::stream::PqStream;
use std::collections::HashMap;
use thiserror::Error;
use utils::pq_proto::StartupMessageParams;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub enum ClientCredsParseError {
#[error("Parameter '{0}' is missing in startup packet.")]
#[error("Parameter `{0}` is missing in startup packet.")]
MissingKey(&'static str),
#[error(
"Project name is not specified. \
EITHER please upgrade the postgres client library (libpq) for SNI support \
OR pass the project name as a parameter: '&options=project%3D<project-name>'."
)]
MissingSNIAndProjectName,
#[error("Inconsistent project name inferred from SNI ('{0}') and project option ('{1}').")]
InconsistentProjectNames(String, String),
InconsistentProjectNameAndSNI(String, String),
#[error("Common name is not set.")]
CommonNameNotSet,
#[error(
"SNI ('{1}') inconsistently formatted with respect to common name ('{0}'). \
SNI should be formatted as '<project-name>.{0}'."
SNI should be formatted as '<project-name>.<common-name>'."
)]
InconsistentSni(String, String),
InconsistentCommonNameAndSNI(String, String),
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
MalformedProjectName(String),
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphens ('-').")]
ProjectNameContainsIllegalChars(String),
}
impl UserFacingError for ClientCredsParseError {}
@@ -30,171 +44,286 @@ impl UserFacingError for ClientCredsParseError {}
pub struct ClientCredentials {
pub user: String,
pub dbname: String,
pub project: Option<String>,
pub project_name: Result<String, ClientCredsParseError>,
}
impl ClientCredentials {
pub fn project(&self) -> Option<&str> {
self.project.as_deref()
pub fn is_existing_user(&self) -> bool {
// This logic will likely change in the future.
self.user.ends_with("@zenith")
}
}
impl ClientCredentials {
pub fn parse(
mut options: StartupMessageParams,
sni: Option<&str>,
mut options: HashMap<String, String>,
sni_data: Option<&str>,
common_name: Option<&str>,
) -> Result<Self, ClientCredsParseError> {
use ClientCredsParseError::*;
let mut get_param = |key| {
options
.remove(key)
.ok_or(ClientCredsParseError::MissingKey(key))
};
// Some parameters are absolutely necessary, others not so much.
let mut get_param = |key| options.remove(key).ok_or(MissingKey(key));
// Some parameters are stored in the startup message.
let user = get_param("user")?;
let dbname = get_param("database")?;
let project_a = get_param("project").ok();
// Alternative project name is in fact a subdomain from SNI.
// NOTE: we do not consider SNI if `common_name` is missing.
let project_b = sni
.zip(common_name)
.map(|(sni, cn)| {
// TODO: what if SNI is present but just a common name?
subdomain_from_sni(sni, cn)
.ok_or_else(|| InconsistentSni(sni.to_owned(), cn.to_owned()))
})
.transpose()?;
let project = match (project_a, project_b) {
// Invariant: if we have both project name variants, they should match.
(Some(a), Some(b)) if a != b => Some(Err(InconsistentProjectNames(a, b))),
(a, b) => a.or(b).map(|name| {
// Invariant: project name may not contain certain characters.
check_project_name(name).map_err(MalformedProjectName)
}),
}
.transpose()?;
let project_name = get_param("project").ok();
let project_name = get_project_name(sni_data, common_name, project_name.as_deref());
Ok(Self {
user,
dbname,
project,
project_name,
})
}
}
fn check_project_name(name: String) -> Result<String, String> {
if name.chars().all(|c| c.is_alphanumeric() || c == '-') {
Ok(name)
} else {
Err(name)
/// Use credentials to authenticate the user.
pub async fn authenticate(
self,
config: &ProxyConfig,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> super::Result<compute::NodeInfo> {
// This method is just a convenient facade for `handle_user`
super::backend::handle_user(config, client, self).await
}
}
fn subdomain_from_sni(sni: &str, common_name: &str) -> Option<String> {
sni.strip_suffix(common_name)?
.strip_suffix('.')
.map(str::to_owned)
/// Inferring project name from sni_data.
fn project_name_from_sni_data(
sni_data: &str,
common_name: &str,
) -> Result<String, ClientCredsParseError> {
let common_name_with_dot = format!(".{common_name}");
// check that ".{common_name_with_dot}" is the actual suffix in sni_data
if !sni_data.ends_with(&common_name_with_dot) {
return Err(ClientCredsParseError::InconsistentCommonNameAndSNI(
common_name.to_string(),
sni_data.to_string(),
));
}
// return sni_data without the common name suffix.
Ok(sni_data
.strip_suffix(&common_name_with_dot)
.unwrap()
.to_string())
}
#[cfg(test)]
mod tests {
mod tests_for_project_name_from_sni_data {
use super::*;
fn make_options<'a, const N: usize>(pairs: [(&'a str, &'a str); N]) -> StartupMessageParams {
StartupMessageParams::from(pairs.map(|(k, v)| (k.to_owned(), v.to_owned())))
#[test]
fn passing() {
let target_project_name = "my-project-123";
let common_name = "localtest.me";
let sni_data = format!("{target_project_name}.{common_name}");
assert_eq!(
project_name_from_sni_data(&sni_data, common_name),
Ok(target_project_name.to_string())
);
}
#[test]
#[ignore = "TODO: fix how database is handled"]
fn parse_bare_minimum() -> anyhow::Result<()> {
// According to postgresql, only `user` should be required.
let options = make_options([("user", "john_doe")]);
// TODO: check that `creds.dbname` is None.
let creds = ClientCredentials::parse(options, None, None)?;
assert_eq!(creds.user, "john_doe");
Ok(())
}
#[test]
fn parse_missing_project() -> anyhow::Result<()> {
let options = make_options([("user", "john_doe"), ("database", "world")]);
let creds = ClientCredentials::parse(options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project, None);
Ok(())
}
#[test]
fn parse_project_from_sni() -> anyhow::Result<()> {
let options = make_options([("user", "john_doe"), ("database", "world")]);
let sni = Some("foo.localhost");
let common_name = Some("localhost");
let creds = ClientCredentials::parse(options, sni, common_name)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project.as_deref(), Some("foo"));
Ok(())
}
#[test]
fn parse_project_from_options() -> anyhow::Result<()> {
let options = make_options([
("user", "john_doe"),
("database", "world"),
("project", "bar"),
]);
let creds = ClientCredentials::parse(options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project.as_deref(), Some("bar"));
Ok(())
}
#[test]
fn parse_projects_identical() -> anyhow::Result<()> {
let options = make_options([
("user", "john_doe"),
("database", "world"),
("project", "baz"),
]);
let sni = Some("baz.localhost");
let common_name = Some("localhost");
let creds = ClientCredentials::parse(options, sni, common_name)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project.as_deref(), Some("baz"));
Ok(())
}
#[test]
fn parse_projects_different() {
let options = make_options([
("user", "john_doe"),
("database", "world"),
("project", "first"),
]);
let sni = Some("second.localhost");
let common_name = Some("localhost");
assert!(matches!(
ClientCredentials::parse(options, sni, common_name).expect_err("should fail"),
ClientCredsParseError::InconsistentProjectNames(_, _)
));
fn throws_inconsistent_common_name_and_sni_data() {
let target_project_name = "my-project-123";
let common_name = "localtest.me";
let wrong_suffix = "wrongtest.me";
assert_eq!(common_name.len(), wrong_suffix.len());
let wrong_common_name = format!("wrong{wrong_suffix}");
let sni_data = format!("{target_project_name}.{wrong_common_name}");
assert_eq!(
project_name_from_sni_data(&sni_data, common_name),
Err(ClientCredsParseError::InconsistentCommonNameAndSNI(
common_name.to_string(),
sni_data
))
);
}
}
/// Determine project name from SNI or from project_name parameter from options argument.
fn get_project_name(
sni_data: Option<&str>,
common_name: Option<&str>,
project_name: Option<&str>,
) -> Result<String, ClientCredsParseError> {
// determine the project name from sni_data if it exists, otherwise from project_name.
let ret = match sni_data {
Some(sni_data) => {
let common_name = common_name.ok_or(ClientCredsParseError::CommonNameNotSet)?;
let project_name_from_sni = project_name_from_sni_data(sni_data, common_name)?;
// check invariant: project name from options and from sni should match
if let Some(project_name) = &project_name {
if !project_name_from_sni.eq(project_name) {
return Err(ClientCredsParseError::InconsistentProjectNameAndSNI(
project_name_from_sni,
project_name.to_string(),
));
}
}
project_name_from_sni
}
None => project_name
.ok_or(ClientCredsParseError::MissingSNIAndProjectName)?
.to_string(),
};
// check formatting invariant: project name must contain only alphanumeric characters and hyphens.
if !ret.chars().all(|x: char| x.is_alphanumeric() || x == '-') {
return Err(ClientCredsParseError::ProjectNameContainsIllegalChars(ret));
}
Ok(ret)
}
#[cfg(test)]
mod tests_for_project_name_only {
use super::*;
#[test]
fn passing_from_sni_data_only() {
let target_project_name = "my-project-123";
let common_name = "localtest.me";
let sni_data = format!("{target_project_name}.{common_name}");
assert_eq!(
get_project_name(Some(&sni_data), Some(common_name), None),
Ok(target_project_name.to_string())
);
}
#[test]
fn throws_project_name_contains_illegal_chars_from_sni_data_only() {
let project_name_prefix = "my-project";
let project_name_suffix = "123";
let common_name = "localtest.me";
for illegal_char_id in 0..256 {
let illegal_char = char::from_u32(illegal_char_id).unwrap();
if !(illegal_char.is_alphanumeric() || illegal_char == '-')
&& illegal_char.to_string().len() == 1
{
let target_project_name =
format!("{project_name_prefix}{illegal_char}{project_name_suffix}");
let sni_data = format!("{target_project_name}.{common_name}");
assert_eq!(
get_project_name(Some(&sni_data), Some(common_name), None),
Err(ClientCredsParseError::ProjectNameContainsIllegalChars(
target_project_name
))
);
}
}
}
#[test]
fn passing_from_project_name_only() {
let target_project_name = "my-project-123";
let common_names = [Some("localtest.me"), None];
for common_name in common_names {
assert_eq!(
get_project_name(None, common_name, Some(target_project_name)),
Ok(target_project_name.to_string())
);
}
}
#[test]
fn throws_project_name_contains_illegal_chars_from_project_name_only() {
let project_name_prefix = "my-project";
let project_name_suffix = "123";
let common_names = [Some("localtest.me"), None];
for common_name in common_names {
for illegal_char_id in 0..256 {
let illegal_char: char = char::from_u32(illegal_char_id).unwrap();
if !(illegal_char.is_alphanumeric() || illegal_char == '-')
&& illegal_char.to_string().len() == 1
{
let target_project_name =
format!("{project_name_prefix}{illegal_char}{project_name_suffix}");
assert_eq!(
get_project_name(None, common_name, Some(&target_project_name)),
Err(ClientCredsParseError::ProjectNameContainsIllegalChars(
target_project_name
))
);
}
}
}
}
#[test]
fn passing_from_sni_data_and_project_name() {
let target_project_name = "my-project-123";
let common_name = "localtest.me";
let sni_data = format!("{target_project_name}.{common_name}");
assert_eq!(
get_project_name(
Some(&sni_data),
Some(common_name),
Some(target_project_name)
),
Ok(target_project_name.to_string())
);
}
#[test]
fn throws_inconsistent_project_name_and_sni() {
let project_name_param = "my-project-123";
let wrong_project_name = "not-my-project-123";
let common_name = "localtest.me";
let sni_data = format!("{wrong_project_name}.{common_name}");
assert_eq!(
get_project_name(Some(&sni_data), Some(common_name), Some(project_name_param)),
Err(ClientCredsParseError::InconsistentProjectNameAndSNI(
wrong_project_name.to_string(),
project_name_param.to_string()
))
);
}
#[test]
fn throws_common_name_not_set() {
let target_project_name = "my-project-123";
let wrong_project_name = "not-my-project-123";
let common_name = "localtest.me";
let sni_datas = [
Some(format!("{wrong_project_name}.{common_name}")),
Some(format!("{target_project_name}.{common_name}")),
];
let project_names = [None, Some(target_project_name)];
for sni_data in sni_datas {
for project_name_param in project_names {
assert_eq!(
get_project_name(sni_data.as_deref(), None, project_name_param),
Err(ClientCredsParseError::CommonNameNotSet)
);
}
}
}
#[test]
fn throws_inconsistent_common_name_and_sni_data() {
let target_project_name = "my-project-123";
let wrong_project_name = "not-my-project-123";
let common_name = "localtest.me";
let wrong_suffix = "wrongtest.me";
assert_eq!(common_name.len(), wrong_suffix.len());
let wrong_common_name = format!("wrong{wrong_suffix}");
let sni_datas = [
Some(format!("{wrong_project_name}.{wrong_common_name}")),
Some(format!("{target_project_name}.{wrong_common_name}")),
];
let project_names = [None, Some(target_project_name)];
for project_name_param in project_names {
for sni_data in &sni_datas {
assert_eq!(
get_project_name(sni_data.as_deref(), Some(common_name), project_name_param),
Err(ClientCredsParseError::InconsistentCommonNameAndSNI(
common_name.to_string(),
sni_data.clone().unwrap().to_string()
))
);
}
}
}
}

View File

@@ -1,7 +1,8 @@
//! Main authentication flow.
use super::{AuthErrorImpl, PasswordHackPayload};
use crate::{sasl, scram, stream::PqStream};
use super::AuthErrorImpl;
use crate::stream::PqStream;
use crate::{sasl, scram};
use std::io;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
@@ -26,17 +27,6 @@ impl AuthMethod for Scram<'_> {
}
}
/// Use an ad hoc auth flow (for clients which don't support SNI) proposed in
/// <https://github.com/neondatabase/cloud/issues/1620#issuecomment-1165332290>.
pub struct PasswordHack;
impl AuthMethod for PasswordHack {
#[inline(always)]
fn first_message(&self) -> BeMessage<'_> {
Be::AuthenticationCleartextPassword
}
}
/// This wrapper for [`PqStream`] performs client authentication.
#[must_use]
pub struct AuthFlow<'a, Stream, State> {
@@ -67,34 +57,13 @@ impl<'a, S: AsyncWrite + Unpin> AuthFlow<'a, S, Begin> {
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, PasswordHack> {
/// Perform user authentication. Raise an error in case authentication failed.
pub async fn authenticate(self) -> super::Result<PasswordHackPayload> {
let msg = self.stream.read_password_message().await?;
let password = msg
.strip_suffix(&[0])
.ok_or(AuthErrorImpl::MalformedPassword("missing terminator"))?;
// The so-called "password" should contain a base64-encoded json.
// We will use it later to route the client to their project.
let bytes = base64::decode(password)
.map_err(|_| AuthErrorImpl::MalformedPassword("bad encoding"))?;
let payload = serde_json::from_slice(&bytes)
.map_err(|_| AuthErrorImpl::MalformedPassword("invalid payload"))?;
Ok(payload)
}
}
/// Stream wrapper for handling [SCRAM](crate::scram) auth.
impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
/// Perform user authentication. Raise an error in case authentication failed.
pub async fn authenticate(self) -> super::Result<scram::ScramKey> {
// Initial client message contains the chosen auth method's name.
let msg = self.stream.read_password_message().await?;
let sasl = sasl::FirstMessage::parse(&msg)
.ok_or(AuthErrorImpl::MalformedPassword("bad sasl message"))?;
let sasl = sasl::FirstMessage::parse(&msg).ok_or(AuthErrorImpl::MalformedPassword)?;
// Currently, the only supported SASL method is SCRAM.
if !scram::METHODS.contains(&sasl.method) {

View File

@@ -1,102 +0,0 @@
//! Payload for ad hoc authentication method for clients that don't support SNI.
//! See the `impl` for [`super::backend::BackendType<ClientCredentials>`].
//! Read more: <https://github.com/neondatabase/cloud/issues/1620#issuecomment-1165332290>.
use serde::{de, Deserialize, Deserializer};
use std::fmt;
#[derive(Deserialize)]
#[serde(untagged)]
pub enum Password {
/// A regular string for utf-8 encoded passwords.
Simple { password: String },
/// Password is base64-encoded because it may contain arbitrary byte sequences.
Encoded {
#[serde(rename = "password_", deserialize_with = "deserialize_base64")]
password: Vec<u8>,
},
}
impl AsRef<[u8]> for Password {
fn as_ref(&self) -> &[u8] {
match self {
Password::Simple { password } => password.as_ref(),
Password::Encoded { password } => password.as_ref(),
}
}
}
#[derive(Deserialize)]
pub struct PasswordHackPayload {
pub project: String,
#[serde(flatten)]
pub password: Password,
}
fn deserialize_base64<'a, D: Deserializer<'a>>(des: D) -> Result<Vec<u8>, D::Error> {
// It's very tempting to replace this with
//
// ```
// let base64: &str = Deserialize::deserialize(des)?;
// base64::decode(base64).map_err(serde::de::Error::custom)
// ```
//
// Unfortunately, we can't always deserialize into `&str`, so we'd
// have to use an allocating `String` instead. Thus, visitor is better.
struct Visitor;
impl<'de> de::Visitor<'de> for Visitor {
type Value = Vec<u8>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string")
}
fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
base64::decode(v).map_err(de::Error::custom)
}
}
des.deserialize_str(Visitor)
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
use serde_json::json;
#[test]
fn parse_password() -> anyhow::Result<()> {
let password: Password = serde_json::from_value(json!({
"password": "foo",
}))?;
assert_eq!(password.as_ref(), "foo".as_bytes());
let password: Password = serde_json::from_value(json!({
"password_": base64::encode("foo"),
}))?;
assert_eq!(password.as_ref(), "foo".as_bytes());
Ok(())
}
#[rstest]
#[case("password", str::to_owned)]
#[case("password_", base64::encode)]
fn parse(#[case] key: &str, #[case] encode: fn(&'static str) -> String) -> anyhow::Result<()> {
let (password, project) = ("password", "pie-in-the-sky");
let payload = json!({
"project": project,
key: encode(password),
});
let payload: PasswordHackPayload = serde_json::from_value(payload)?;
assert_eq!(payload.password.as_ref(), password.as_bytes());
assert_eq!(payload.project, project);
Ok(())
}
}

View File

@@ -1,6 +1,8 @@
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::TryFutureExt;
use std::{io, net::SocketAddr};
use crate::auth::DatabaseInfo;
use crate::cancellation::CancelClosure;
use crate::error::UserFacingError;
use std::io;
use std::net::SocketAddr;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
@@ -19,96 +21,44 @@ pub enum ConnectionError {
FailedToFetchPgVersion,
}
impl UserFacingError for ConnectionError {
fn to_string_client(&self) -> String {
use ConnectionError::*;
match self {
// This helps us drop irrelevant library-specific prefixes.
// TODO: propagate severity level and other parameters.
Postgres(err) => match err.as_db_error() {
Some(err) => err.message().to_string(),
None => err.to_string(),
},
other => other.to_string(),
}
}
}
impl UserFacingError for ConnectionError {}
/// PostgreSQL version as [`String`].
pub type Version = String;
/// A pair of `ClientKey` & `ServerKey` for `SCRAM-SHA-256`.
pub type ScramKeys = tokio_postgres::config::ScramKeys<32>;
pub type ComputeConnCfg = tokio_postgres::Config;
/// Various compute node info for establishing connection etc.
/// Compute node connection params.
pub struct NodeInfo {
/// Did we send [`utils::pq_proto::BeMessage::AuthenticationOk`]?
pub reported_auth_ok: bool,
/// Compute node connection params.
pub config: tokio_postgres::Config,
pub db_info: DatabaseInfo,
pub scram_keys: Option<ScramKeys>,
}
impl NodeInfo {
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> {
use tokio_postgres::config::Host;
let host_port = (self.db_info.host.as_str(), self.db_info.port);
let socket = TcpStream::connect(host_port).await?;
let socket_addr = socket.peer_addr()?;
socket2::SockRef::from(&socket).set_keepalive(true)?;
let connect_once = |host, port| {
TcpStream::connect((host, port)).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
// This prevents load balancer from severing the connection.
socket2::SockRef::from(&socket).set_keepalive(true)?;
Ok((socket_addr, socket))
})
};
// We can't reuse connection establishing logic from `tokio_postgres` here,
// because it has no means for extracting the underlying socket which we
// require for our business.
let mut connection_error = None;
let ports = self.config.get_ports();
for (i, host) in self.config.get_hosts().iter().enumerate() {
let port = ports.get(i).or_else(|| ports.get(0)).unwrap_or(&5432);
let host = match host {
Host::Tcp(host) => host.as_str(),
Host::Unix(_) => continue, // unix sockets are not welcome here
};
// TODO: maybe we should add a timeout.
match connect_once(host, *port).await {
Ok(socket) => return Ok(socket),
Err(err) => {
// We can't throw an error here, as there might be more hosts to try.
println!("failed to connect to compute `{host}:{port}`: {err}");
connection_error = Some(err);
}
}
}
Err(connection_error.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!("couldn't connect: bad compute config: {:?}", self.config),
)
}))
Ok((socket_addr, socket))
}
}
pub struct PostgresConnection {
/// Socket connected to a compute node.
pub stream: TcpStream,
/// PostgreSQL version of this instance.
pub version: String,
}
impl NodeInfo {
/// Connect to a corresponding compute node.
pub async fn connect(&self) -> Result<(PostgresConnection, CancelClosure), ConnectionError> {
let (socket_addr, mut stream) = self
pub async fn connect(self) -> Result<(TcpStream, Version, CancelClosure), ConnectionError> {
let (socket_addr, mut socket) = self
.connect_raw()
.await
.map_err(|_| ConnectionError::FailedToConnectToCompute)?;
let mut config = tokio_postgres::Config::from(self.db_info);
if let Some(scram_keys) = self.scram_keys {
config.auth_keys(tokio_postgres::config::AuthKeys::ScramSha256(scram_keys));
}
// TODO: establish a secure connection to the DB
let (client, conn) = self.config.connect_raw(&mut stream, NoTls).await?;
let (client, conn) = config.connect_raw(&mut socket, NoTls).await?;
let version = conn
.parameter("server_version")
.ok_or(ConnectionError::FailedToFetchPgVersion)?
@@ -116,8 +66,6 @@ impl NodeInfo {
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
let db = PostgresConnection { stream, version };
Ok((db, cancel_closure))
Ok((socket, version, cancel_closure))
}
}

View File

@@ -1,16 +1,28 @@
use crate::{auth, url::ApiUrl};
use crate::url::ApiUrl;
use anyhow::{bail, ensure, Context};
use std::{str::FromStr, sync::Arc};
impl FromStr for auth::BackendType<()> {
#[derive(Debug)]
pub enum AuthBackendType {
/// Legacy Cloud API (V1).
LegacyConsole,
/// Authentication via a web browser.
Link,
/// Current Cloud API (V2).
Console,
/// Local mock of Cloud API (V2).
Postgres,
}
impl FromStr for AuthBackendType {
type Err = anyhow::Error;
fn from_str(s: &str) -> anyhow::Result<Self> {
use auth::BackendType::*;
use AuthBackendType::*;
Ok(match s {
"legacy" => LegacyConsole(()),
"console" => Console(()),
"postgres" => Postgres(()),
"legacy" => LegacyConsole,
"console" => Console,
"postgres" => Postgres,
"link" => Link,
_ => bail!("Invalid option `{s}` for auth method"),
})
@@ -19,11 +31,7 @@ impl FromStr for auth::BackendType<()> {
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::BackendType<()>,
pub auth_urls: AuthUrls,
}
pub struct AuthUrls {
pub auth_backend: AuthBackendType,
pub auth_endpoint: ApiUrl,
pub auth_link_uri: ApiUrl,
}
@@ -79,8 +87,10 @@ pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfi
"Failed to parse PEM object from bytes from file at '{cert_path}'."
))?
.1;
let common_name = pem.parse_x509()?.subject().to_string();
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
let almost_common_name = pem.parse_x509()?.tbs_certificate.subject.to_string();
let expected_prefix = "CN=*.";
let common_name = almost_common_name.strip_prefix(expected_prefix);
common_name.map(str::to_string)
};
Ok(TlsConfig {

View File

@@ -1,5 +1,3 @@
use std::io;
/// Marks errors that may be safely shown to a client.
/// This trait can be seen as a specialized version of [`ToString`].
///
@@ -17,8 +15,3 @@ pub trait UserFacingError: ToString {
self.to_string()
}
}
/// Upcast (almost) any error into an opaque [`io::Error`].
pub fn io_error(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}

View File

@@ -118,15 +118,11 @@ async fn main() -> anyhow::Result<()> {
let mgmt_address: SocketAddr = arg_matches.value_of("mgmt").unwrap().parse()?;
let http_address: SocketAddr = arg_matches.value_of("http").unwrap().parse()?;
let auth_urls = config::AuthUrls {
auth_endpoint: arg_matches.value_of("auth-endpoint").unwrap().parse()?,
auth_link_uri: arg_matches.value_of("uri").unwrap().parse()?,
};
let config: &ProxyConfig = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend: arg_matches.value_of("auth-backend").unwrap().parse()?,
auth_urls,
auth_endpoint: arg_matches.value_of("auth-endpoint").unwrap().parse()?,
auth_link_uri: arg_matches.value_of("uri").unwrap().parse()?,
}));
println!("Version: {GIT_VERSION}");

View File

@@ -82,22 +82,11 @@ async fn handle_client(
}
let tls = config.tls_config.as_ref();
let (mut stream, params) = match handshake(stream, tls, cancel_map).await? {
let (stream, creds) = match handshake(stream, tls, cancel_map).await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
let creds = {
let sni = stream.get_ref().sni_hostname();
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let result = config
.auth_backend
.map(|_| auth::ClientCredentials::parse(params, sni, common_name))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?
};
let client = Client::new(stream, creds);
cancel_map
.with_session(|session| client.connect_to_db(config, session))
@@ -112,10 +101,12 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
mut tls: Option<&TlsConfig>,
cancel_map: &CancelMap,
) -> anyhow::Result<Option<(PqStream<Stream<S>>, StartupMessageParams)>> {
) -> anyhow::Result<Option<(PqStream<Stream<S>>, auth::ClientCredentials)>> {
// Client may try upgrading to each protocol only once
let (mut tried_ssl, mut tried_gss) = (false, false);
let common_name = tls.and_then(|cfg| cfg.common_name.as_deref());
let mut stream = PqStream::new(Stream::from_raw(stream));
loop {
let msg = stream.read_startup_packet().await?;
@@ -156,7 +147,18 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream.throw_error_str(ERR_INSECURE_CONNECTION).await?;
}
break Ok(Some((stream, params)));
// Get SNI info when available
let sni_data = match stream.get_ref() {
Stream::Tls { tls } => tls.get_ref().1.sni_hostname().map(|s| s.to_owned()),
_ => None,
};
// Construct credentials
let creds =
auth::ClientCredentials::parse(params, sni_data.as_deref(), common_name);
let creds = async { creds }.or_else(|e| stream.throw_error(e)).await?;
break Ok(Some((stream, creds)));
}
CancelRequest(cancel_key_data) => {
cancel_map.cancel_session(cancel_key_data).await?;
@@ -172,12 +174,12 @@ struct Client<S> {
/// The underlying libpq protocol stream.
stream: PqStream<S>,
/// Client credentials that we care about.
creds: auth::BackendType<auth::ClientCredentials>,
creds: auth::ClientCredentials,
}
impl<S> Client<S> {
/// Construct a new connection context.
fn new(stream: PqStream<S>, creds: auth::BackendType<auth::ClientCredentials>) -> Self {
fn new(stream: PqStream<S>, creds: auth::ClientCredentials) -> Self {
Self { stream, creds }
}
}
@@ -192,22 +194,16 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<S> {
let Self { mut stream, creds } = self;
// Authenticate and connect to a compute node.
let auth = creds.authenticate(&config.auth_urls, &mut stream).await;
let auth = creds.authenticate(config, &mut stream).await;
let node = async { auth }.or_else(|e| stream.throw_error(e)).await?;
let (db, cancel_closure) = node.connect().or_else(|e| stream.throw_error(e)).await?;
let (db, version, cancel_closure) =
node.connect().or_else(|e| stream.throw_error(e)).await?;
let cancel_key_data = session.enable_cancellation(cancel_closure);
// Report authentication success if we haven't done this already.
if !node.reported_auth_ok {
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
}
stream
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion(&db.version),
BeParameterStatusMessage::ServerVersion(&version),
))?
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
.write_message(&BeMessage::ReadyForQuery)
@@ -221,7 +217,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<S> {
}
// Starting from here we only proxy the client's traffic.
let mut db = MetricsStream::new(db.stream, inc_proxied);
let mut db = MetricsStream::new(db, inc_proxied);
let mut client = MetricsStream::new(stream.into_inner(), inc_proxied);
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
@@ -283,13 +279,9 @@ mod tests {
let config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(vec![cert], key)?
.into();
.with_single_cert(vec![cert], key)?;
TlsConfig {
config,
common_name: Some(common_name.to_string()),
}
config.into()
};
let client_config = {
@@ -305,6 +297,11 @@ mod tests {
ClientConfig { config, hostname }
};
let tls_config = TlsConfig {
config: tls_config,
common_name: Some(common_name.to_string()),
};
Ok((client_config, tls_config))
}
@@ -360,7 +357,7 @@ mod tests {
auth: impl TestAuth + Send,
) -> anyhow::Result<()> {
let cancel_map = CancelMap::default();
let (mut stream, _params) = handshake(client, tls.as_ref(), &cancel_map)
let (mut stream, _creds) = handshake(client, tls.as_ref(), &cancel_map)
.await?
.context("handshake failed")?;
@@ -439,6 +436,32 @@ mod tests {
proxy.await?
}
#[tokio::test]
async fn give_user_an_error_for_bad_creds() -> anyhow::Result<()> {
let (client, server) = tokio::io::duplex(1024);
let proxy = tokio::spawn(dummy_proxy(client, None, NoAuth));
let client_err = tokio_postgres::Config::new()
.ssl_mode(SslMode::Disable)
.connect_raw(server, NoTls)
.await
.err() // -> Option<E>
.context("client shouldn't be able to connect")?;
// TODO: this is ugly, but `format!` won't allow us to extract fmt string
assert!(client_err.to_string().contains("missing in startup packet"));
let server_err = proxy
.await?
.err() // -> Option<E>
.context("server shouldn't accept client")?;
assert!(client_err.to_string().contains(&server_err.to_string()));
Ok(())
}
#[tokio::test]
async fn keepalive_is_inherited() -> anyhow::Result<()> {
use tokio::net::{TcpListener, TcpStream};

View File

@@ -145,14 +145,6 @@ impl<S> Stream<S> {
pub fn from_raw(raw: S) -> Self {
Self::Raw { raw }
}
/// Return SNI hostname when it's available.
pub fn sni_hostname(&self) -> Option<&str> {
match self {
Stream::Raw { .. } => None,
Stream::Tls { tls } => tls.get_ref().1.sni_hostname(),
}
}
}
#[derive(Debug, Error)]

View File

@@ -20,6 +20,7 @@ postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8
anyhow = "1.0"
crc32c = "0.6.0"
humantime = "2.1.0"
walkdir = "2"
url = "2.2.2"
signal-hook = "0.3.10"
serde = { version = "1.0", features = ["derive"] }
@@ -27,9 +28,11 @@ serde_with = "1.12.0"
hex = "0.4.3"
const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-util = { version = "0.7", features = ["io"] }
git-version = "0.3.5"
async-trait = "0.1"
once_cell = "1.10.0"
futures = "0.3.13"
toml_edit = { version = "0.13", features = ["easy"] }
postgres_ffi = { path = "../libs/postgres_ffi" }

View File

@@ -1,3 +1,9 @@
use serde::{Deserialize, Serialize};
use utils::zid::{NodeId, ZTimelineId};
use utils::zid::{NodeId, ZTenantId, ZTimelineId};
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId,
pub peer_ids: Vec<NodeId>,
}

View File

@@ -1,365 +0,0 @@
openapi: "3.0.2"
info:
title: Safekeeper control API
version: "1.0"
servers:
- url: "http://localhost:7676"
paths:
/v1/status:
get:
tags:
- "Info"
summary: Get safekeeper status
description: ""
operationId: v1GetSafekeeperStatus
responses:
"200":
description: Safekeeper status
content:
application/json:
schema:
$ref: "#/components/schemas/SafekeeperStatus"
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
/v1/tenant/{tenant_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
delete:
tags:
- "Tenant"
summary: Delete tenant and all its timelines
description: "Deletes tenant and returns a map of timelines that were deleted along with their statuses"
operationId: v1DeleteTenant
responses:
"200":
description: Tenant deleted
content:
application/json:
schema:
$ref: "#/components/schemas/TenantDeleteResult"
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
/v1/tenant/{tenant_id}/timeline:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
post:
tags:
- "Timeline"
summary: Register new timeline
description: ""
operationId: v1CreateTenantTimeline
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/TimelineCreateRequest"
responses:
"201":
description: Timeline created
# TODO: return timeline info?
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
tags:
- "Timeline"
summary: Get timeline information and status
description: ""
operationId: v1GetTenantTimeline
responses:
"200":
description: Timeline status
content:
application/json:
schema:
$ref: "#/components/schemas/TimelineStatus"
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
delete:
tags:
- "Timeline"
summary: Delete timeline
description: ""
operationId: v1DeleteTenantTimeline
responses:
"200":
description: Timeline deleted
content:
application/json:
schema:
$ref: "#/components/schemas/TimelineDeleteResult"
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
/v1/record_safekeeper_info/{tenant_id}/{timeline_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
post:
tags:
- "Tests"
summary: Used only in tests to hand craft required data
description: ""
operationId: v1RecordSafekeeperInfo
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SkTimelineInfo"
responses:
"200":
description: Timeline info posted
# TODO: return timeline info?
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
components:
securitySchemes:
JWT:
type: http
scheme: bearer
bearerFormat: JWT
schemas:
#
# Requests
#
TimelineCreateRequest:
type: object
required:
- timeline_id
- peer_ids
properties:
timeline_id:
type: string
format: hex
peer_ids:
type: array
items:
type: integer
minimum: 0
SkTimelineInfo:
type: object
required:
- last_log_term
- flush_lsn
- commit_lsn
- backup_lsn
- remote_consistent_lsn
- peer_horizon_lsn
- safekeeper_connstr
properties:
last_log_term:
type: integer
minimum: 0
flush_lsn:
type: string
commit_lsn:
type: string
backup_lsn:
type: string
remote_consistent_lsn:
type: string
peer_horizon_lsn:
type: string
safekeeper_connstr:
type: string
#
# Responses
#
SafekeeperStatus:
type: object
required:
- id
properties:
id:
type: integer
minimum: 0 # kind of unsigned integer
TimelineStatus:
type: object
required:
- timeline_id
- tenant_id
properties:
timeline_id:
type: string
format: hex
tenant_id:
type: string
format: hex
acceptor_state:
$ref: '#/components/schemas/AcceptorStateStatus'
flush_lsn:
type: string
timeline_start_lsn:
type: string
local_start_lsn:
type: string
commit_lsn:
type: string
backup_lsn:
type: string
peer_horizon_lsn:
type: string
remote_consistent_lsn:
type: string
AcceptorStateStatus:
type: object
required:
- term
- epoch
properties:
term:
type: integer
minimum: 0 # kind of unsigned integer
epoch:
type: integer
minimum: 0 # kind of unsigned integer
term_history:
type: array
items:
$ref: '#/components/schemas/TermSwitchEntry'
TermSwitchEntry:
type: object
required:
- term
- lsn
properties:
term:
type: integer
minimum: 0 # kind of unsigned integer
lsn:
type: string
TimelineDeleteResult:
type: object
required:
- dir_existed
- was_active
properties:
dir_existed:
type: boolean
was_active:
type: boolean
TenantDeleteResult:
type: object
additionalProperties:
$ref: "#/components/schemas/TimelineDeleteResult"
example:
57fd1b39f23704a63423de0a8435d85c:
dir_existed: true
was_active: false
67fd1b39f23704a63423gb8435d85c33:
dir_existed: false
was_active: false
#
# Errors
#
GenericErrorContent:
type: object
properties:
msg:
type: string
responses:
#
# Errors
#
GenericError:
description: Generic error response
content:
application/json:
schema:
$ref: "#/components/schemas/GenericErrorContent"
ForbiddenError:
description: Forbidden error response
content:
application/json:
schema:
type: object
required:
- msg
properties:
msg:
type: string
security:
- JWT: []

View File

@@ -25,6 +25,8 @@ use utils::{
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use super::models::TimelineCreateRequest;
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: NodeId,
@@ -120,6 +122,20 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, status)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
let zttid = ZTenantTimelineId {
tenant_id: request_data.tenant_id,
timeline_id: request_data.timeline_id,
};
check_permission(&request, Some(zttid.tenant_id))?;
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
.map_err(ApiError::from_err)?;
json_response(StatusCode::CREATED, ())
}
/// Deactivates the timeline and removes its data directory.
///
/// It does not try to stop any processing of the timeline; there is no such code at the time of writing.
@@ -198,18 +214,16 @@ pub fn make_router(
}
}))
}
// NB: on any changes do not forget to update the OpenAPI spec
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
router
.data(Arc::new(conf))
.data(auth)
.get("/v1/status", status_handler)
// TODO: update OpenAPI spec
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
"/v1/timeline/:tenant_id/:timeline_id",
timeline_status_handler,
)
// Will be used in the future instead of implicit timeline creation
.post("/v1/timeline", timeline_create_handler)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_force_handler,

View File

@@ -484,12 +484,8 @@ impl AcceptorProposerMessage {
}
}
/// Safekeeper implements consensus to reliably persist WAL across nodes.
/// It controls all WAL disk writes and updates of control file.
///
/// Currently safekeeper processes:
/// - messages from compute (proposers) and provides replies
/// - messages from etcd peers
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
/// Note: be careful to set only if we are sure our WAL (term history) matches
@@ -512,28 +508,20 @@ where
CTRL: control_file::Storage,
WAL: wal_storage::Storage,
{
/// Accepts a control file storage containing the safekeeper state.
/// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
/// and `server` (`wal_seg_size` inside it) fields.
// constructor
pub fn new(
ztli: ZTimelineId,
state: CTRL,
mut wal_store: WAL,
node_id: NodeId,
) -> Result<SafeKeeper<CTRL, WAL>> {
if state.tenant_id == ZTenantId::from([0u8; 16])
|| state.timeline_id == ZTimelineId::from([0u8; 16])
{
bail!(
"Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
state.tenant_id,
state.timeline_id
);
}
if state.server.wal_seg_size == 0 {
bail!("Calling SafeKeeper::new with empty wal_seg_size");
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
}
// initialize wal_store, if state is already initialized
wal_store.init_storage(&state)?;
Ok(SafeKeeper {
global_commit_lsn: state.commit_lsn,
epoch_start_lsn: Lsn(0),
@@ -591,7 +579,7 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
// Check protocol compatibility
/* Check protocol compatibility */
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
@@ -599,11 +587,11 @@ where
SK_PROTOCOL_VERSION
);
}
// Postgres upgrade is not treated as fatal error
/* Postgres upgrade is not treated as fatal error */
if msg.pg_version != self.state.server.pg_version
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
warn!(
info!(
"incompatible server version {}, expected {}",
msg.pg_version, self.state.server.pg_version
);
@@ -622,26 +610,18 @@ where
self.state.timeline_id
);
}
if self.state.server.wal_seg_size != msg.wal_seg_size {
bail!(
"invalid wal_seg_size, got {}, expected {}",
msg.wal_seg_size,
self.state.server.wal_seg_size
);
}
// system_id will be updated on mismatch
if self.state.server.system_id != msg.system_id {
warn!(
"unexpected system ID arrived, got {}, expected {}",
msg.system_id, self.state.server.system_id
);
// set basic info about server, if not yet
// TODO: verify that is doesn't change after
{
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
self.state.persist(&state)?;
}
self.wal_store.init_storage(&self.state)?;
info!(
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.state.acceptor_state.term
@@ -690,6 +670,16 @@ where
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.state.acceptor_state.term < term {
let mut state = self.state.clone();
state.acceptor_state.term = term;
self.state.persist(&state)?;
}
Ok(())
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
let ar = AppendResponse {
@@ -706,12 +696,7 @@ where
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone();
state.acceptor_state.term = msg.term;
self.state.persist(&state)?;
}
self.bump_if_higher(msg.term)?;
// If our term is higher, ignore the message (next feedback will inform the compute)
if self.state.acceptor_state.term > msg.term {
return Ok(None);
@@ -764,7 +749,7 @@ where
}
/// Advance commit_lsn taking into account what we have locally
fn update_commit_lsn(&mut self) -> Result<()> {
pub fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.flush_lsn());
assert!(commit_lsn >= self.inmem.commit_lsn);
@@ -967,6 +952,10 @@ mod tests {
self.lsn
}
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
Ok(())
}
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())

View File

@@ -557,7 +557,6 @@ impl TimelineTools for Option<Arc<Timeline>> {
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
conf: Option<SafeKeeperConf>,
}
lazy_static! {
@@ -577,16 +576,15 @@ pub struct TimelineDeleteForceResult {
pub struct GlobalTimelines;
impl GlobalTimelines {
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>, conf: &SafeKeeperConf) {
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
assert!(state.conf.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = Some(conf.clone());
}
fn create_internal(
mut state: MutexGuard<GlobalTimelinesState>,
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
@@ -611,6 +609,15 @@ impl GlobalTimelines {
}
}
pub fn create(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
let state = TIMELINES_STATE.lock().unwrap();
GlobalTimelines::create_internal(state, conf, zttid, peer_ids)
}
/// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map.
/// If control file doesn't exist and create=false, bails out.
pub fn get(
@@ -618,7 +625,7 @@ impl GlobalTimelines {
zttid: ZTenantTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("", tenant = %zttid.tenant_id).entered();
let _enter = info_span!("", timeline = %zttid.tenant_id).entered();
let mut state = TIMELINES_STATE.lock().unwrap();

View File

@@ -53,7 +53,7 @@ pub fn wal_backup_launcher_thread_main(
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup-_attend())
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend())
}
struct WalBackupTaskHandle {

View File

@@ -7,7 +7,7 @@
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
@@ -16,7 +16,7 @@ use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::{
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI,
};
use std::cmp::{max, min};
use std::cmp::min;
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
@@ -86,6 +86,9 @@ pub trait Storage {
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
/// Write piece of WAL from buf to disk, but not necessarily sync it.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
@@ -101,7 +104,7 @@ pub trait Storage {
}
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
/// for better performance. Storage is initialized in the constructor.
/// for better performance. Storage must be initialized before use.
///
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
/// its filename and may be not fully flushed.
@@ -109,15 +112,16 @@ pub trait Storage {
/// Relationship of LSNs:
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
///
/// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
zttid: ZTenantTimelineId,
timeline_dir: PathBuf,
conf: SafeKeeperConf,
/// Size of WAL segment in bytes.
wal_seg_size: usize,
// fields below are filled upon initialization
/// None if uninitialized, Some(usize) if storage is initialized.
wal_seg_size: Option<usize>,
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
@@ -142,50 +146,25 @@ pub struct PhysicalStorage {
}
impl PhysicalStorage {
pub fn new(
zttid: &ZTenantTimelineId,
conf: &SafeKeeperConf,
state: &SafeKeeperState,
) -> Result<PhysicalStorage> {
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage {
let timeline_dir = conf.timeline_dir(zttid);
if state.server.wal_seg_size == 0 {
bail!("wal_seg_size must be initialized before creating PhysicalStorage");
}
let wal_seg_size = state.server.wal_seg_size as usize;
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
Lsn(find_end_of_wal(&timeline_dir, wal_seg_size, true, state.commit_lsn)?.0)
};
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
let flush_lsn = write_lsn;
info!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
zttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", zttid.timeline_id);
}
Ok(PhysicalStorage {
PhysicalStorage {
metrics: WalStorageMetrics::new(zttid),
zttid: *zttid,
timeline_dir,
conf: conf.clone(),
wal_seg_size,
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn),
wal_seg_size: None,
write_lsn: Lsn(0),
write_record_lsn: Lsn(0),
flush_record_lsn: Lsn(0),
decoder: WalStreamDecoder::new(Lsn(0)),
file: None,
})
}
}
/// Wrapper for flush_lsn updates that also updates metrics.
fn update_flush_lsn(&mut self) {
self.flush_record_lsn = self.write_record_lsn;
}
/// Call fdatasync if config requires so.
@@ -210,9 +189,9 @@ impl PhysicalStorage {
/// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`.
fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> {
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
@@ -228,18 +207,24 @@ impl PhysicalStorage {
.open(&wal_file_partial_path)
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
write_zeroes(&mut file, self.wal_seg_size)?;
write_zeroes(&mut file, wal_seg_size)?;
self.fsync_file(&mut file)?;
Ok((file, true))
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
fn write_in_segment(
&mut self,
segno: u64,
xlogoff: usize,
buf: &[u8],
wal_seg_size: usize,
) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
let (mut file, is_partial) = self.open_or_create(segno)?;
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64))?;
file
@@ -247,13 +232,13 @@ impl PhysicalStorage {
file.write_all(buf)?;
if xlogoff + buf.len() == self.wal_seg_size {
if xlogoff + buf.len() == wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&mut file)?;
// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
fs::rename(&wal_file_partial_path, &wal_file_path)?;
} else {
// otherwise, file can be reused later
@@ -269,6 +254,10 @@ impl PhysicalStorage {
///
/// Updates `write_lsn`.
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(mut file) = self.file.take() {
@@ -280,17 +269,17 @@ impl PhysicalStorage {
while !buf.is_empty() {
// Extract WAL location for this block
let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(self.wal_seg_size);
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(wal_seg_size);
// If crossing a WAL boundary, only write up until we reach wal segment size.
let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
self.wal_seg_size - xlogoff
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
wal_seg_size - xlogoff
} else {
buf.len()
};
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?;
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
self.write_lsn += bytes_write as u64;
buf = &buf[bytes_write..];
}
@@ -305,6 +294,53 @@ impl Storage for PhysicalStorage {
self.flush_record_lsn
}
/// Storage needs to know wal_seg_size to know which segment to read/write, but
/// wal_seg_size is not always known at the moment of storage creation. This method
/// allows to postpone its initialization.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
if state.server.wal_seg_size == 0 {
// wal_seg_size is still unknown. This is dead path normally, should
// be used only in tests.
return Ok(());
}
if let Some(wal_seg_size) = self.wal_seg_size {
// physical storage is already initialized
assert_eq!(wal_seg_size, state.server.wal_seg_size as usize);
return Ok(());
}
// initialize physical storage
let wal_seg_size = state.server.wal_seg_size as usize;
self.wal_seg_size = Some(wal_seg_size);
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
self.write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.commit_lsn)?.0)
};
self.write_record_lsn = self.write_lsn;
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
self.update_flush_lsn();
info!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if self.flush_record_lsn < state.commit_lsn
|| self.flush_record_lsn < state.peer_horizon_lsn
{
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", self.zttid.timeline_id);
}
Ok(())
}
/// Write WAL to disk.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.
@@ -368,83 +404,80 @@ impl Storage for PhysicalStorage {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
// but haven't updated flush_lsn yet.
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
bail!(
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
self.write_lsn,
self.flush_record_lsn
);
}
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
}
// everything is flushed now, let's update flush_lsn
self.flush_record_lsn = self.write_record_lsn;
self.update_flush_lsn();
Ok(())
}
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos >= self.write_lsn {
bail!(
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
self.write_lsn,
end_pos
);
}
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
// Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;
}
let xlogoff = end_pos.segment_offset(self.wal_seg_size) as usize;
let segno = end_pos.segment_number(self.wal_seg_size);
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?;
let (mut file, is_partial) = self.open_or_create(segno)?;
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
let segno = end_pos.segment_number(wal_seg_size);
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64))?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?;
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
self.fdatasync_file(&mut file)?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
let mut segno = segno;
loop {
segno += 1;
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// TODO: better use fs::try_exists which is currently available only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
} else if wal_file_partial_path.exists() {
fs::remove_file(&wal_file_partial_path)?;
} else {
break;
}
}
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.flush_record_lsn = end_pos;
self.update_flush_lsn();
Ok(())
}
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
let timeline_dir = self.timeline_dir.clone();
let wal_seg_size = self.wal_seg_size;
let wal_seg_size = self.wal_seg_size.unwrap();
Box::new(move |segno_up_to: XLogSegNo| {
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
remove_up_to(&timeline_dir, wal_seg_size, segno_up_to)
})
}
}
/// Remove all WAL segments in timeline_dir that match the given predicate.
fn remove_segments_from_disk(
timeline_dir: &Path,
wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
/// Remove all WAL segments in timeline_dir <= given segno.
fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> {
let mut n_removed = 0;
let mut min_removed = u64::MAX;
let mut max_removed = u64::MIN;
for entry in fs::read_dir(&timeline_dir)? {
let entry = entry?;
let entry_path = entry.path();
@@ -456,21 +489,19 @@ fn remove_segments_from_disk(
continue;
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if remove_predicate(segno) {
if segno <= segno_up_to {
remove_file(entry_path)?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
}
}
}
if n_removed > 0 {
info!(
"removed {} WAL segments [{}; {}]",
n_removed, min_removed, max_removed
);
}
let segno_from = segno_up_to - n_removed + 1;
info!(
"removed {} WAL segments [{}; {}]",
n_removed,
XLogFileName(PG_TLI, segno_from, wal_seg_size),
XLogFileName(PG_TLI, segno_up_to, wal_seg_size)
);
Ok(())
}
@@ -480,10 +511,8 @@ pub struct WalReader {
pos: Lsn,
wal_segment: Option<Pin<Box<dyn AsyncRead>>>,
// S3 will be used to read WAL if LSN is not available locally
enable_remote_read: bool,
// We don't have WAL locally if LSN is less than local_start_lsn
// S3 will be used to read WAL if LSN is not available locally
local_start_lsn: Lsn,
}

View File

@@ -1,6 +1,5 @@
import threading
import pytest
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import lsn_from_hex
@@ -139,7 +138,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
'image_creation_threshold': '1',
# set PITR interval to be small, so we can do GC
'pitr_interval': '0 s'
'pitr_interval': '1 s'
})
b0 = env.neon_cli.create_branch('b0', tenant_id=tenant)
@@ -163,11 +162,6 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
thread = threading.Thread(target=do_gc, daemon=True)
thread.start()
# because of network latency and other factors, GC iteration might be processed
# after the `create_branch` request. Add a sleep here to make sure that GC is
# always processed before.
time.sleep(1.0)
# The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC.
with pytest.raises(Exception, match="invalid branch start lsn"):
env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn)

View File

@@ -1,82 +0,0 @@
import time
import os
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
# This test creates large number of tables which cause large catalog.
# Right now Neon serialize directory as single key-value storage entry and so
# it leads to layer filled mostly by one key.
# Originally Neon implementation of checkpoint and compaction is not able to split key which leads
# to large (several gigabytes) layer files (both ephemeral and delta layers).
# It may cause problems with uploading to S3 and also degrade performance because ephemeral file swapping.
#
def test_large_schema(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pg = env.postgres.create_start('main')
conn = pg.connect()
cur = conn.cursor()
tables = 2 # 10 is too much for debug build
partitions = 1000
for i in range(1, tables + 1):
print(f'iteration {i} / {tables}')
# Restart compute. Restart is actually not strictly needed.
# It is done mostly because this test originally tries to model the problem reported by Ketteq.
pg.stop()
# Kill and restart the pageserver.
# env.pageserver.stop(immediate=True)
# env.pageserver.start()
pg.start()
retry_sleep = 0.5
max_retries = 200
retries = 0
while True:
try:
conn = pg.connect()
cur = conn.cursor()
cur.execute(f"CREATE TABLE if not exists t_{i}(pk integer) partition by range (pk)")
for j in range(1, partitions + 1):
cur.execute(
f"create table if not exists p_{i}_{j} partition of t_{i} for values from ({j}) to ({j + 1})"
)
cur.execute(f"insert into t_{i} values (generate_series(1,{partitions}))")
cur.execute("vacuum full")
conn.close()
except Exception as error:
# It's normal that it takes some time for the pageserver to
# restart, and for the connection to fail until it does. It
# should eventually recover, so retry until it succeeds.
print(f'failed: {error}')
if retries < max_retries:
retries += 1
print(f'retry {retries} / {max_retries}')
time.sleep(retry_sleep)
continue
else:
raise
break
conn = pg.connect()
cur = conn.cursor()
for i in range(1, tables + 1):
cur.execute(f"SELECT count(*) FROM t_{i}")
assert cur.fetchone() == (partitions, )
cur.execute("set enable_sort=off")
cur.execute("select * from pg_depend order by refclassid, refobjid, refobjsubid")
# Check layer file sizes
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant_id, timeline_id)
for filename in os.listdir(timeline_path):
if filename.startswith('00000'):
log.info(f'layer {filename} size is {os.path.getsize(timeline_path + filename)}')
assert os.path.getsize(timeline_path + filename) < 512_000_000

View File

@@ -47,8 +47,7 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID):
for timeline in timelines:
timeline_id_str = str(timeline['timeline_id'])
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=UUID(timeline_id_str),
include_non_incremental_logical_size=True)
timeline_id=UUID(timeline_id_str))
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str
@@ -64,19 +63,13 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
tenant_id, timeline_id = env.neon_cli.create_tenant()
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=timeline_id,
include_non_incremental_logical_size=True)
empty_response = client.wal_receiver_get(tenant_id, timeline_id)
assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
# Test the WAL-receiver related fields in the response to `timeline_details` API call
#
# These fields used to be returned by a separate API call, but they're part of
# `timeline_details` now.
def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
env = neon_simple_env
client = env.pageserver.http_client()
@@ -85,17 +78,18 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
def expect_updated_msg_lsn(prev_msg_lsn: Optional[int]) -> int:
timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id)
res = client.wal_receiver_get(tenant_id, timeline_id)
# a successful `timeline_details` response must contain the below fields
local_timeline_details = timeline_details['local']
assert "wal_source_connstr" in local_timeline_details.keys()
assert "last_received_msg_lsn" in local_timeline_details.keys()
assert "last_received_msg_ts" in local_timeline_details.keys()
# a successful `wal_receiver_get` response must contain the below fields
assert list(res.keys()) == [
"wal_producer_connstr",
"last_received_msg_lsn",
"last_received_msg_ts",
]
assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
assert res["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"])
last_msg_lsn = lsn_from_hex(res["last_received_msg_lsn"])
assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \
f"the last received message's LSN {last_msg_lsn} hasn't been updated \
compared to the previous message's LSN {prev_msg_lsn}"
@@ -104,7 +98,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
# Wait to make sure that we get a latest WAL receiver data.
# We need to wait here because it's possible that we don't have access to
# the latest WAL yet, when the `timeline_detail` API is first called.
# the latest WAL during the time the `wal_receiver_get` API is called.
# See: https://github.com/neondatabase/neon/issues/1768.
lsn = wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(None))

View File

@@ -1,34 +1,8 @@
import pytest
import json
import base64
def test_proxy_select_1(static_proxy):
static_proxy.safe_psql('select 1', options='project=generic-project-name')
def test_password_hack(static_proxy):
user = 'borat'
password = 'password'
static_proxy.safe_psql(f"create role {user} with login password '{password}'",
options='project=irrelevant')
def encode(s: str) -> str:
return base64.b64encode(s.encode('utf-8')).decode('utf-8')
magic = encode(json.dumps({
'project': 'irrelevant',
'password': password,
}))
static_proxy.safe_psql('select 1', sslsni=0, user=user, password=magic)
magic = encode(json.dumps({
'project': 'irrelevant',
'password_': encode(password),
}))
static_proxy.safe_psql('select 1', sslsni=0, user=user, password=magic)
static_proxy.safe_psql("select 1;", options="project=generic-project-name")
# Pass extra options to the server.
@@ -37,8 +11,8 @@ def test_password_hack(static_proxy):
# See https://github.com/neondatabase/neon/issues/1287
@pytest.mark.xfail
def test_proxy_options(static_proxy):
with static_proxy.connect(options='-cproxytest.option=value') as conn:
with static_proxy.connect(options="-cproxytest.option=value") as conn:
with conn.cursor() as cur:
cur.execute('SHOW proxytest.option')
cur.execute("SHOW proxytest.option;")
value = cur.fetchall()[0][0]
assert value == 'value'

View File

@@ -26,7 +26,7 @@ from fixtures.neon_fixtures import (
wait_for_upload,
wait_until,
)
from fixtures.utils import lsn_from_hex, lsn_to_hex, subprocess_capture
from fixtures.utils import lsn_from_hex, subprocess_capture
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@@ -268,7 +268,6 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
env.neon_cli.create_branch(
new_branch_name="test_tenant_relocation_second",
ancestor_branch_name="test_tenant_relocation_main",
ancestor_start_lsn=lsn_to_hex(current_lsn_main),
tenant_id=tenant_id,
)
pg_second = env.postgres.create_start(branch_name='test_tenant_relocation_second',

View File

@@ -1,15 +1,10 @@
from contextlib import closing
import random
from uuid import UUID
import re
import psycopg2.extras
import psycopg2.errors
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, assert_timeline_local
from fixtures.log_helper import log
import time
from fixtures.utils import get_timeline_dir_size
def test_timeline_size(neon_simple_env: NeonEnv):
env = neon_simple_env
@@ -181,163 +176,3 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
pg_cluster_size = cur.fetchone()
log.info(f"pg_cluster_size = {pg_cluster_size}")
def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_init')
pg = env.postgres.create_start("test_timeline_physical_size_init")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 1000) g""",
])
# restart the pageserer to force calculating timeline's initial physical size
env.pageserver.stop()
env.pageserver.start()
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_checkpoint')
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 1000) g""",
])
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
# Disable background compaction as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='10m'}"
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_compaction')
pg = env.postgres.create_start("test_timeline_physical_size_post_compaction")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g""",
])
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
env.pageserver.safe_psql(f"compact {env.initial_tenant.hex} {new_timeline_id.hex}")
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
neon_env_builder.pageserver_config_override = \
"tenant_config={checkpoint_distance=100000, compaction_period='10m', gc_period='10m', pitr_interval='1s'}"
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_gc')
pg = env.postgres.create_start("test_timeline_physical_size_post_gc")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g""",
])
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
pg.safe_psql("""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
""")
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
env.pageserver.safe_psql(f"do_gc {env.initial_tenant.hex} {new_timeline_id.hex} 0")
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_metric(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_metric')
pg = env.postgres.create_start("test_timeline_physical_size_metric")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g""",
])
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
# get the metrics and parse the metric for the current timeline's physical size
metrics = env.pageserver.http_client().get_metrics()
matches = re.search(
f'^pageserver_current_physical_size{{tenant_id="{env.initial_tenant.hex}",timeline_id="{new_timeline_id.hex}"}} (\\S+)$',
metrics,
re.MULTILINE)
assert matches
# assert that the metric matches the actual physical size on disk
tl_physical_size_metric = int(matches.group(1))
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
def test_tenant_physical_size(neon_simple_env: NeonEnv):
random.seed(100)
env = neon_simple_env
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
def get_timeline_physical_size(timeline: UUID):
res = client.timeline_detail(tenant, timeline)
return res['local']['current_physical_size_non_incremental']
timeline_total_size = get_timeline_physical_size(timeline)
for i in range(10):
n_rows = random.randint(100, 1000)
timeline = env.neon_cli.create_branch(f"test_tenant_physical_size_{i}", tenant_id=tenant)
pg = env.postgres.create_start(f"test_tenant_physical_size_{i}", tenant_id=tenant)
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
f"INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, {n_rows}) g",
])
env.pageserver.safe_psql(f"checkpoint {tenant.hex} {timeline.hex}")
timeline_total_size += get_timeline_physical_size(timeline)
pg.stop()
tenant_physical_size = int(client.tenant_status(tenant_id=tenant)['current_physical_size'])
assert tenant_physical_size == timeline_total_size
def assert_physical_size(env: NeonEnv, tenant_id: UUID, timeline_id: UUID):
"""Check the current physical size returned from timeline API
matches the total physical size of the timeline on disk"""
client = env.pageserver.http_client()
res = assert_timeline_local(client, tenant_id, timeline_id)
timeline_path = env.timeline_dir(tenant_id, timeline_id)
assert res["local"]["current_physical_size"] == res["local"][
"current_physical_size_non_incremental"]
assert res["local"]["current_physical_size"] == get_timeline_dir_size(timeline_path)

View File

@@ -14,43 +14,13 @@ from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
from fixtures.neon_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
from uuid import uuid4
def wait_lsn_force_checkpoint(tenant_id: str,
timeline_id: str,
pg: Postgres,
ps: NeonPageserver,
pageserver_conn_options={}):
lsn = lsn_from_hex(pg.safe_psql('SELECT pg_current_wal_flush_lsn()')[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn_to_hex(lsn)}, waiting for it on pageserver")
auth_token = None
if 'password' in pageserver_conn_options:
auth_token = pageserver_conn_options['password']
# wait for the pageserver to catch up
wait_for_last_record_lsn(ps.http_client(auth_token=auth_token),
uuid.UUID(hex=tenant_id),
uuid.UUID(hex=timeline_id),
lsn)
# force checkpoint to advance remote_consistent_lsn
with closing(ps.connect(**pageserver_conn_options)) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# ensure that remote_consistent_lsn is advanced
wait_for_upload(ps.http_client(auth_token=auth_token),
uuid.UUID(hex=tenant_id),
uuid.UUID(hex=timeline_id),
lsn)
@dataclass
class TimelineMetrics:
timeline_id: str
@@ -233,6 +203,113 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
assert cur.fetchone() == (500500, )
start_delay_sec = 2
def delayed_safekeeper_start(wa):
time.sleep(start_delay_sec)
wa.start()
# When majority of acceptors is offline, commits are expected to be frozen
def test_unavailability(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 2
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_unavailability')
pg = env.postgres.create_start('test_safekeepers_unavailability')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
pg_conn = pg.connect()
cur = pg_conn.cursor()
# check basic work with table
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t values (1, 'payload')")
# shutdown one of two acceptors, that is, majority
env.safekeepers[0].stop()
proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[0], ))
proc.start()
start = time.time()
cur.execute("INSERT INTO t values (2, 'payload')")
# ensure that the query above was hanging while acceptor was down
assert (time.time() - start) >= start_delay_sec
proc.join()
# for the world's balance, do the same with second acceptor
env.safekeepers[1].stop()
proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[1], ))
proc.start()
start = time.time()
cur.execute("INSERT INTO t values (3, 'payload')")
# ensure that the query above was hanging while acceptor was down
assert (time.time() - start) >= start_delay_sec
proc.join()
cur.execute("INSERT INTO t values (4, 'payload')")
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (10, )
# shut down random subset of acceptors, sleep, wake them up, rinse, repeat
def xmas_garland(acceptors, stop):
while not bool(stop.value):
victims = []
for wa in acceptors:
if random.random() >= 0.5:
victims.append(wa)
for v in victims:
v.stop()
time.sleep(1)
for v in victims:
v.start()
time.sleep(1)
# value which gets unset on exit
@pytest.fixture
def stop_value():
stop = Value('i', 0)
yield stop
stop.value = 1
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(neon_env_builder: NeonEnvBuilder, stop_value):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute('CREATE TABLE t(key int primary key, value text)')
proc = Process(target=xmas_garland, args=(env.safekeepers, stop_value))
proc.start()
for i in range(1000):
cur.execute("INSERT INTO t values (%s, 'payload');", (i + 1, ))
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (500500, )
stop_value.value = 1
proc.join()
# Test that safekeepers push their info to the broker and learn peer status from it
def test_broker(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
@@ -253,10 +330,10 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
log.info(f"statuses is {stat_before}")
pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
# force checkpoint in pageserver to advance remote_consistent_lsn
wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver)
# force checkpoint to advance remote_consistent_lsn
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# and wait till remote_consistent_lsn propagates to all safekeepers
started_at = time.time()
while True:
@@ -300,7 +377,9 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
pageserver_conn_options = {}
if auth_enabled:
pageserver_conn_options['password'] = env.auth_keys.generate_tenant_token(tenant_id)
wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver, pageserver_conn_options)
with closing(env.pageserver.connect(**pageserver_conn_options)) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# We will wait for first segment removal. Make sure they exist for starter.
first_segments = [

View File

@@ -9,7 +9,6 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List, Optional
from dataclasses import dataclass
log = getLogger('root.safekeeper_async')
@@ -147,8 +146,9 @@ async def run_restarts_under_load(env: NeonEnv,
max_transfer=100,
period_time=4,
iterations=10):
# Set timeout for this test at 5 minutes. It should be enough for test to complete,
# taking into account that this timeout is checked only at the beginning of every iteration.
# Set timeout for this test at 5 minutes. It should be enough for test to complete
# and less than CircleCI's no_output_timeout, taking into account that this timeout
# is checked only at the beginning of every iteration.
test_timeout_at = time.monotonic() + 5 * 60
pg_conn = await pg.connect_async()
@@ -404,119 +404,3 @@ def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
env.neon_cli.create_branch('test_concurrent_computes')
asyncio.run(run_concurrent_computes(env))
# Stop safekeeper and check that query cannot be executed while safekeeper is down.
# Query will insert a single row into a table.
async def check_unavailability(sk: Safekeeper,
conn: asyncpg.Connection,
key: int,
start_delay_sec: int = 2):
# shutdown one of two acceptors, that is, majority
sk.stop()
bg_query = asyncio.create_task(conn.execute(f"INSERT INTO t values ({key}, 'payload')"))
await asyncio.sleep(start_delay_sec)
# ensure that the query has not been executed yet
assert not bg_query.done()
# start safekeeper and await the query
sk.start()
await bg_query
assert bg_query.done()
async def run_unavailability(env: NeonEnv, pg: Postgres):
conn = await pg.connect_async()
# check basic work with table
await conn.execute('CREATE TABLE t(key int primary key, value text)')
await conn.execute("INSERT INTO t values (1, 'payload')")
# stop safekeeper and check that query cannot be executed while safekeeper is down
await check_unavailability(env.safekeepers[0], conn, 2)
# for the world's balance, do the same with second safekeeper
await check_unavailability(env.safekeepers[1], conn, 3)
# check that we can execute queries after restart
await conn.execute("INSERT INTO t values (4, 'payload')")
result_sum = await conn.fetchval('SELECT sum(key) FROM t')
assert result_sum == 10
# When majority of acceptors is offline, commits are expected to be frozen
def test_unavailability(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 2
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_unavailability')
pg = env.postgres.create_start('test_safekeepers_unavailability')
asyncio.run(run_unavailability(env, pg))
@dataclass
class RaceConditionTest:
iteration: int
is_stopped: bool
# shut down random subset of safekeeper, sleep, wake them up, rinse, repeat
async def xmas_garland(safekeepers: List[Safekeeper], data: RaceConditionTest):
while not data.is_stopped:
data.iteration += 1
victims = []
for sk in safekeepers:
if random.random() >= 0.5:
victims.append(sk)
log.info(
f'Iteration {data.iteration}: stopping {list(map(lambda sk: sk.id, victims))} safekeepers'
)
for v in victims:
v.stop()
await asyncio.sleep(1)
for v in victims:
v.start()
log.info(f'Iteration {data.iteration} finished')
await asyncio.sleep(1)
async def run_race_conditions(env: NeonEnv, pg: Postgres):
conn = await pg.connect_async()
await conn.execute('CREATE TABLE t(key int primary key, value text)')
data = RaceConditionTest(0, False)
bg_xmas = asyncio.create_task(xmas_garland(env.safekeepers, data))
n_iterations = 5
expected_sum = 0
i = 1
while data.iteration <= n_iterations:
await asyncio.sleep(0.005)
await conn.execute(f"INSERT INTO t values ({i}, 'payload')")
expected_sum += i
i += 1
log.info(f'Executed {i-1} queries')
res = await conn.fetchval('SELECT sum(key) FROM t')
assert res == expected_sum
data.is_stopped = True
await bg_xmas
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
asyncio.run(run_race_conditions(env, pg))

View File

@@ -20,18 +20,22 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, caps
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Find the pg_isolation_regress binary
proc = pg_bin.run(['pg_config', '--libdir'], capture_output=True)
libdir = proc.stdout.decode().strip()
proc = pg_bin.run(['pg_config', '--bindir'], capture_output=True)
bindir = proc.stdout.decode().strip()
pg_isolation_regress = os.path.join(libdir,
'postgresql/pgxs/src/test/isolation/pg_isolation_regress')
# Compute all the file locations that pg_isolation_regress will need.
build_path = os.path.join(pg_distrib_dir, 'build/src/test/isolation')
src_path = os.path.join(base_dir, 'vendor/postgres/src/test/isolation')
bindir = os.path.join(pg_distrib_dir, 'bin')
schedule = os.path.join(src_path, 'isolation_schedule')
pg_isolation_regress = os.path.join(build_path, 'pg_isolation_regress')
pg_isolation_regress_command = [
pg_isolation_regress,
'--use-existing',
'--bindir={}'.format(bindir),
'--dlpath={}'.format(build_path),
'--inputdir={}'.format(src_path),
'--schedule={}'.format(schedule),
]

View File

@@ -20,19 +20,22 @@ def test_neon_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, c
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Find the pg_regress binary and --bindir option to pass to it.
proc = pg_bin.run(['pg_config', '--libdir'], capture_output=True)
libdir = proc.stdout.decode().strip()
proc = pg_bin.run(['pg_config', '--bindir'], capture_output=True)
bindir = proc.stdout.decode().strip()
pg_regress = os.path.join(libdir, 'postgresql/pgxs/src/test/regress/pg_regress')
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests
build_path = os.path.join(pg_distrib_dir, 'build/src/test/regress')
src_path = os.path.join(base_dir, 'test_runner/neon_regress')
bindir = os.path.join(pg_distrib_dir, 'bin')
schedule = os.path.join(src_path, 'parallel_schedule')
pg_regress = os.path.join(build_path, 'pg_regress')
pg_regress_command = [
pg_regress,
'--use-existing',
'--bindir={}'.format(bindir),
'--dlpath={}'.format(build_path),
'--schedule={}'.format(schedule),
'--inputdir={}'.format(src_path),
]

View File

@@ -19,19 +19,23 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: pathlib.Path, pg_
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Find the pg_regress binary and --bindir option to pass to it.
proc = pg_bin.run(['pg_config', '--libdir'], capture_output=True)
libdir = proc.stdout.decode().strip()
proc = pg_bin.run(['pg_config', '--bindir'], capture_output=True)
bindir = proc.stdout.decode().strip()
pg_regress = os.path.join(libdir, 'postgresql/pgxs/src/test/regress/pg_regress')
# Compute all the file locations that pg_regress will need.
build_path = os.path.join(pg_distrib_dir, 'build/src/test/regress')
src_path = os.path.join(base_dir, 'vendor/postgres/src/test/regress')
bindir = os.path.join(pg_distrib_dir, 'bin')
schedule = os.path.join(src_path, 'parallel_schedule')
pg_regress = os.path.join(build_path, 'pg_regress')
dlpath = os.path.join(base_dir, 'build/src/test/regress')
pg_regress_command = [
pg_regress,
'--bindir=""',
'--use-existing',
'--bindir={}'.format(bindir),
'--dlpath={}'.format(build_path),
'--dlpath={}'.format(dlpath),
'--schedule={}'.format(schedule),
'--inputdir={}'.format(src_path),
]

View File

@@ -1,5 +1,5 @@
pytest_plugins = ("fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",
"fixtures.compare_fixtures",
"fixtures.slow")
"fixtures.slow",
"fixtures.pg_stats")

View File

@@ -30,7 +30,7 @@ from dataclasses import dataclass
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import make_dsn, parse_dsn
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
from typing import Any, Callable, Dict, Iterator, List, Optional, Type, TypeVar, cast, Union, Tuple
from typing_extensions import Literal
import requests
@@ -280,18 +280,20 @@ class PgProtocol:
return str(make_dsn(**self.conn_options(**kwargs)))
def conn_options(self, **kwargs):
result = self.default_options.copy()
conn_options = self.default_options.copy()
if 'dsn' in kwargs:
result.update(parse_dsn(kwargs['dsn']))
result.update(kwargs)
conn_options.update(parse_dsn(kwargs['dsn']))
conn_options.update(kwargs)
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can
# change it by calling "SET statement_timeout" after
# connecting.
options = result.get('options', '')
result['options'] = f'-cstatement_timeout=120s {options}'
return result
if 'options' in conn_options:
conn_options['options'] = f"-cstatement_timeout=120s " + conn_options['options']
else:
conn_options['options'] = "-cstatement_timeout=120s"
return conn_options
# autocommit=True here by default because that's what we need most of the time
def connect(self, autocommit=True, **kwargs) -> PgConnection:
@@ -691,10 +693,6 @@ class NeonEnv:
""" Get list of safekeeper endpoints suitable for safekeepers GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
def timeline_dir(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
return self.repo_dir / "tenants" / tenant_id.hex / "timelines" / timeline_id.hex
@cached_property
def auth_keys(self) -> AuthKeys:
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
@@ -865,24 +863,10 @@ class NeonPageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_detail(self,
tenant_id: uuid.UUID,
timeline_id: uuid.UUID,
include_non_incremental_logical_size: bool = False,
include_non_incremental_physical_size: bool = False) -> Dict[Any, Any]:
include_non_incremental_logical_size_str = "0"
if include_non_incremental_logical_size:
include_non_incremental_logical_size_str = "1"
include_non_incremental_physical_size_str = "0"
if include_non_incremental_physical_size:
include_non_incremental_physical_size_str = "1"
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}" +
"?include-non-incremental-logical-size={include_non_incremental_logical_size_str}" +
"&include-non-incremental-physical-size={include_non_incremental_physical_size_str}")
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1"
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
@@ -896,6 +880,15 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver"
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
@@ -1379,7 +1372,10 @@ class PgBin:
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
def run(self,
command: List[str],
env: Optional[Env] = None,
**kwargs) -> 'subprocess.CompletedProcess[str]':
"""
Run one of the postgres binaries.
@@ -1396,7 +1392,7 @@ class PgBin:
self._fixpath(command)
log.info('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
return subprocess.run(command, env=env, check=True, **kwargs)
def run_capture(self,
command: List[str],
@@ -1521,25 +1517,29 @@ def remote_pg(test_output_dir: Path) -> Iterator[RemotePostgres]:
class NeonProxy(PgProtocol):
def __init__(self, proxy_port: int, http_port: int, auth_endpoint: str):
super().__init__(dsn=auth_endpoint, port=proxy_port)
self.host = '127.0.0.1'
self.http_port = http_port
self.proxy_port = proxy_port
self.auth_endpoint = auth_endpoint
def __init__(self, port: int, pg_port: int):
super().__init__(host="127.0.0.1",
user="proxy_user",
password="pytest2",
port=port,
dbname='postgres')
self.http_port = 7001
self.host = "127.0.0.1"
self.port = port
self.pg_port = pg_port
self._popen: Optional[subprocess.Popen[bytes]] = None
def start(self) -> None:
assert self._popen is None
# Start proxy
args = [
os.path.join(str(neon_binpath), 'proxy'),
*["--http", f"{self.host}:{self.http_port}"],
*["--proxy", f"{self.host}:{self.proxy_port}"],
*["--auth-backend", "postgres"],
*["--auth-endpoint", self.auth_endpoint],
]
bin_proxy = os.path.join(str(neon_binpath), 'proxy')
args = [bin_proxy]
args.extend(["--http", f"{self.host}:{self.http_port}"])
args.extend(["--proxy", f"{self.host}:{self.port}"])
args.extend(["--auth-backend", "postgres"])
args.extend(
["--auth-endpoint", f"postgres://proxy_auth:pytest1@localhost:{self.pg_port}/postgres"])
self._popen = subprocess.Popen(args)
self._wait_until_ready()
@@ -1560,21 +1560,13 @@ class NeonProxy(PgProtocol):
@pytest.fixture(scope='function')
def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
"""Neon proxy that routes directly to vanilla postgres."""
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
vanilla_pg.start()
vanilla_pg.safe_psql("create user proxy with login superuser password 'password'")
vanilla_pg.safe_psql("create user proxy_auth with password 'pytest1' superuser")
vanilla_pg.safe_psql("create user proxy_user with password 'pytest2'")
port = vanilla_pg.default_options['port']
host = vanilla_pg.default_options['host']
dbname = vanilla_pg.default_options['dbname']
auth_endpoint = f'postgres://proxy:password@{host}:{port}/{dbname}'
proxy_port = port_distributor.get_port()
http_port = port_distributor.get_port()
with NeonProxy(proxy_port=proxy_port, http_port=http_port,
auth_endpoint=auth_endpoint) as proxy:
port = port_distributor.get_port()
pg_port = vanilla_pg.default_options['port']
with NeonProxy(port, pg_port) as proxy:
proxy.start()
yield proxy
@@ -1934,7 +1926,7 @@ class SafekeeperHttpClient(requests.Session):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def timeline_status(self, tenant_id: str, timeline_id: str) -> SafekeeperTimelineStatus:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
res.raise_for_status()
resj = res.json()
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],

View File

@@ -1,11 +1,9 @@
import contextlib
import os
import pathlib
import shutil
import subprocess
from pathlib import Path
from typing import Any, List, Tuple
from typing import Any, List
from fixtures.log_helper import log
@@ -91,36 +89,3 @@ def get_dir_size(path: str) -> int:
pass # file could be concurrently removed
return totalbytes
def get_timeline_dir_size(path: pathlib.Path) -> int:
"""Get the timeline directory's total size, which only counts the layer files' size."""
sz = 0
for dir_entry in path.iterdir():
with contextlib.suppress(Exception):
# file is an image layer
_ = parse_image_layer(dir_entry.name)
sz += dir_entry.stat().st_size
continue
with contextlib.suppress(Exception):
# file is a delta layer
_ = parse_delta_layer(dir_entry.name)
sz += dir_entry.stat().st_size
continue
return sz
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
parts = f_name.split("__")
key_parts = parts[0].split("-")
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
parts = f_name.split("__")
key_parts = parts[0].split("-")
lsn_parts = parts[1].split("-")
return int(key_parts[0], 16), int(key_parts[1], 16), int(lsn_parts[0], 16), int(lsn_parts[1], 16)

View File

@@ -1,6 +1,4 @@
import os
import threading
import time
from typing import List
import pytest
@@ -89,34 +87,3 @@ def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare
env.pg_bin.run_capture(
['pgbench', f'-T{duration}', f'--random-seed={seed}', env.pg.connstr()])
env.flush()
@pytest.mark.parametrize("n_tables", [1, 10])
@pytest.mark.parametrize("duration", get_durations_matrix(10))
def test_compare_pg_stats_wo_with_heavy_write(neon_with_baseline: PgCompare,
n_tables: int,
duration: int,
pg_stats_wo: List[PgStatTable]):
env = neon_with_baseline
with env.pg.connect().cursor() as cur:
for i in range(n_tables):
cur.execute(
f"CREATE TABLE t{i}(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
)
def start_single_table_workload(table_id: int):
start = time.time()
with env.pg.connect().cursor() as cur:
while time.time() - start < duration:
cur.execute(f"INSERT INTO t{table_id} SELECT FROM generate_series(1,1000)")
with env.record_pg_stats(pg_stats_wo):
threads = [
threading.Thread(target=start_single_table_workload, args=(i, ))
for i in range(n_tables)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

View File

@@ -1,50 +0,0 @@
import pytest
from contextlib import closing
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
@pytest.mark.parametrize(
"env",
[
# The test is too slow to run in CI, but fast enough to run with remote tests
pytest.param(lazy_fixture("neon_compare"), id="neon", marks=pytest.mark.slow),
pytest.param(lazy_fixture("vanilla_compare"), id="vanilla", marks=pytest.mark.slow),
pytest.param(lazy_fixture("remote_compare"), id="remote", marks=pytest.mark.remote_cluster),
])
def test_dup_key(env: PgCompare):
# Update the same page many times, then measure read performance
with closing(env.pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('drop table if exists t, f;')
cur.execute("SET synchronous_commit=off")
cur.execute("SET statement_timeout=0")
# Write many updates to the same row
with env.record_duration('write'):
cur.execute("create table t (i integer, filler text);")
cur.execute('insert into t values (0);')
cur.execute("""
do $$
begin
for ivar in 1..5000000 loop
update t set i = ivar, filler = repeat('a', 50);
update t set i = ivar, filler = repeat('b', 50);
update t set i = ivar, filler = repeat('c', 50);
update t set i = ivar, filler = repeat('d', 50);
rollback;
end loop;
end;
$$;
""")
# Write 3-4 MB to evict t from compute cache
cur.execute('create table f (i integer);')
cur.execute(f'insert into f values (generate_series(1,100000));')
# Read
with env.record_duration('read'):
cur.execute('select * from t;')
cur.fetchall()