Compare commits

..

1 Commits

Author SHA1 Message Date
Dmitry Rodionov
4c1cb890db try to toggle background activity without a race condition 2022-10-20 22:30:42 +03:00
70 changed files with 1229 additions and 3030 deletions

View File

@@ -73,13 +73,6 @@ runs:
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Download compatibility snapshot for Postgres 14
uses: ./.github/actions/download
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14
path: /tmp/compatibility_snapshot_pg14
prefix: latest
- name: Run pytest
env:
NEON_BIN: /tmp/neon/bin
@@ -87,8 +80,6 @@ runs:
BUILD_TYPE: ${{ inputs.build_type }}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg14
ALLOW_BREAKING_CHANGES: contains(github.event.pull_request.labels.*.name, 'breaking changes')
shell: bash -euxo pipefail {0}
run: |
# PLATFORM will be embedded in the perf test report
@@ -163,15 +154,6 @@ runs:
scripts/generate_and_push_perf_report.sh
fi
- name: Upload compatibility snapshot for Postgres 14
if: github.ref_name == 'release'
uses: ./.github/actions/upload
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14-${{ github.run_id }}
# The path includes a test name (test_prepare_snapshot) and directory that the test creates (compatibility_snapshot_pg14), keep the path in sync with the test
path: /tmp/test_output/test_prepare_snapshot/compatibility_snapshot_pg14/
prefix: latest
- name: Create Allure report
if: always()
uses: ./.github/actions/allure-report

View File

@@ -3,6 +3,7 @@ storage:
bucket_name: neon-storage-ireland
bucket_region: eu-west-1
console_mgmt_base_url: http://neon-stress-console.local
env_name: neon-stress
etcd_endpoints: neon-stress-etcd.local:2379
safekeeper_enable_s3_offload: 'false'
pageserver_config_stub:
@@ -11,7 +12,6 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "{{ inventory_hostname }}"
safekeeper_s3_prefix: neon-stress/wal
hostname_suffix: ".local"
remote_user: admin
children:

View File

@@ -1,6 +1,7 @@
---
storage:
vars:
env_name: prod-1
console_mgmt_base_url: http://console-release.local
bucket_name: zenith-storage-oregon
bucket_region: us-west-2
@@ -11,7 +12,6 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "{{ inventory_hostname }}"
safekeeper_s3_prefix: prod-1/wal
hostname_suffix: ".local"
remote_user: admin

View File

@@ -3,6 +3,7 @@ storage:
bucket_name: zenith-staging-storage-us-east-1
bucket_region: us-east-1
console_mgmt_base_url: http://console-staging.local
env_name: us-stage
etcd_endpoints: zenith-us-stage-etcd.local:2379
pageserver_config_stub:
pg_distrib_dir: /usr/local
@@ -10,7 +11,6 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "{{ inventory_hostname }}"
safekeeper_s3_prefix: us-stage/wal
hostname_suffix: ".local"
remote_user: admin

View File

@@ -3,6 +3,7 @@ storage:
bucket_name: neon-staging-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://console-staging.local
env_name: us-stage
etcd_endpoints: etcd-0.us-east-2.aws.neon.build:2379
pageserver_config_stub:
pg_distrib_dir: /usr/local
@@ -10,7 +11,6 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-east-2

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}'
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

View File

@@ -1,31 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.build
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack

View File

@@ -481,7 +481,6 @@ jobs:
neon-image:
runs-on: dev
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.9.0-debug
steps:
@@ -495,11 +494,10 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build neon
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
compute-tools-image:
runs-on: dev
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.9.0-debug
steps:
@@ -510,12 +508,11 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute tools
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
compute-node-image:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
needs: [ tag ]
steps:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
@@ -530,12 +527,11 @@ jobs:
# cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated
- name: Kaniko build compute node with extensions v14 (compatibility)
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
compute-node-image-v14:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
needs: [ tag ]
steps:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
@@ -547,13 +543,12 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v14
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID
compute-node-image-v15:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
needs: [ tag ]
steps:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
@@ -565,11 +560,11 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v15
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID
promote-images:
runs-on: dev
needs: [ tag, neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli
strategy:
@@ -582,9 +577,8 @@ jobs:
steps:
- name: Promote image to latest
run: |
export MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=${{needs.tag.outputs.build-tag}} --query 'images[].imageManifest' --output text)
aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
run:
MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=$GITHUB_RUN_ID --query 'images[].imageManifest' --output text) && aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
push-docker-hub:
runs-on: dev
@@ -603,19 +597,19 @@ jobs:
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Pull neon image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:latest neon
- name: Pull compute tools image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest compute-tools
- name: Pull compute node image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}} compute-node
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:latest compute-node
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
@@ -625,11 +619,11 @@ jobs:
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v15:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v15:latest
- name: Configure Docker Hub login
run: |
@@ -825,52 +819,3 @@ jobs:
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
deploy-proxy-new:
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
(github.ref_name == 'main') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region us-east-2 eks update-kubeconfig --name dev-us-east-2-beta --role-arn arn:aws:iam::369495373322:role/github-runner
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
promote-compatibility-test-snapshot:
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ deploy, deploy-proxy ]
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
steps:
- name: Promote compatibility snapshot for the release
shell: bash -euxo pipefail {0}
env:
BUCKET: neon-github-public-dev
PREFIX: artifacts/latest
run: |
for build_type in debug release; do
OLD_FILENAME=compatibility-snapshot-${build_type}-pg14-${GITHUB_RUN_ID}.tar.zst
NEW_FILENAME=compatibility-snapshot-${build_type}-pg14.tar.zst
time aws s3 mv --only-show-errors s3://${BUCKET}/${PREFIX}/${OLD_FILENAME} s3://${BUCKET}/${PREFIX}/${NEW_FILENAME}
done

25
Cargo.lock generated
View File

@@ -2170,7 +2170,6 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"svg_fmt",
"tar",
"tempfile",
"thiserror",
@@ -2189,10 +2188,7 @@ dependencies = [
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"const_format",
"postgres_ffi",
"serde",
"serde_with",
"utils",
@@ -3465,12 +3461,6 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "svg_fmt"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
[[package]]
name = "symbolic-common"
version = "8.8.0"
@@ -3942,16 +3932,6 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.16"
@@ -3962,15 +3942,12 @@ dependencies = [
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
@@ -4065,8 +4042,6 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"strum",
"strum_macros",
"tempfile",
"thiserror",
"tokio",

View File

@@ -44,7 +44,7 @@ COPY . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin proxy --locked --release \
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin safekeeper --bin proxy --locked --release \
&& cachepot -s
# Build final image
@@ -65,7 +65,6 @@ RUN set -e \
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin

View File

@@ -1,26 +1,24 @@
#
# This file is identical to the Dockerfile.compute-node-v15 file
# except for the version of Postgres that is built.
#
ARG TAG=pinned
# apparently, ARGs don't get replaced in RUN commands in kaniko
# ARG POSTGIS_VERSION=3.3.0
# ARG PLV8_VERSION=3.1.4
# ARG PG_VERSION=v14
#########################################################################################
#
# Layer "build-deps"
#
#########################################################################################
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
#########################################################################################
#
# Layer "pg-build"
# Build Postgres from the neon postgres repository.
#
#########################################################################################
FROM build-deps AS pg-build
COPY vendor/postgres-v14 postgres
RUN cd postgres && \
@@ -31,20 +29,22 @@ RUN cd postgres && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install
#########################################################################################
#
# Layer "postgis-build"
# Build PostGIS from the upstream PostGIS mirror.
#
#########################################################################################
# PostGIS compiles against neon postgres sources without changes. Perhaps we
# could even use the upstream binaries, compiled against vanilla Postgres, but
# it would require some investigation to check that it works, and also keeps
# working in the future. So for now, we compile our own binaries.
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
tar xvzf postgis-3.3.1.tar.gz && \
cd postgis-3.3.1 && \
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
tar xvzf postgis-3.3.0.tar.gz && \
cd postgis-3.3.0 && \
./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure && \
@@ -57,29 +57,19 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_tiger_geocoder.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_topology.control
#########################################################################################
#
# Layer "plv8-build"
# Build plv8
#
#########################################################################################
FROM build-deps AS plv8-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5 binutils
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
# https://github.com/plv8/plv8/issues/475:
# v8 uses gold for linking and sets `--thread-count=4` which breaks
# gold version <= 1.35 (https://sourceware.org/bugzilla/show_bug.cgi?id=23607)
# Install newer gold version manually as debian-testing binutils version updates
# libc version, which in turn breaks other extension built against non-testing libc.
RUN wget https://ftp.gnu.org/gnu/binutils/binutils-2.38.tar.gz && \
tar xvzf binutils-2.38.tar.gz && \
cd binutils-2.38 && \
cd libiberty && ./configure && make -j $(getconf _NPROCESSORS_ONLN) && \
cd ../bfd && ./configure && make bfdver.h && \
cd ../gold && ./configure && make -j $(getconf _NPROCESSORS_ONLN) && make install && \
cp /usr/local/bin/ld.gold /usr/bin/gold
# https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
# Sed is used to patch for https://github.com/plv8/plv8/issues/503
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -87,25 +77,21 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
cd plv8-3.1.4 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
sed -i 's/MemoryContextAlloc(/MemoryContextAllocZero(/' plv8.cc && \
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#########################################################################################
#
# Layer "h3-pg-build"
# Build h3_pg
#
#########################################################################################
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN wget https://github.com/Kitware/CMake/releases/download/v3.24.2/cmake-3.24.2-linux-x86_64.sh \
-q -O /tmp/cmake-install.sh \
&& chmod u+x /tmp/cmake-install.sh \
&& /tmp/cmake-install.sh --skip-license --prefix=/usr/local/ \
&& rm /tmp/cmake-install.sh
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
@@ -124,15 +110,12 @@ RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3_postgis.control
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
#########################################################################################
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -145,22 +128,16 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon \
-s install
#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` binary
#
#########################################################################################
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:$TAG AS compute-tools
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cd compute_tools && cargo build --locked --profile release-line-debug-size-lto
#########################################################################################
#
# Clean up postgres folder before inclusion
#
#########################################################################################
FROM neon-pg-ext-build AS postgres-cleanup-layer
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
@@ -178,12 +155,10 @@ RUN rm -r /usr/local/pgsql/lib/pgxs/src
# if they were to be used by other libraries.
RUN rm /usr/local/pgsql/lib/lib*.a
#########################################################################################
#
# Final layer
# Put it all together into the final image
#
#########################################################################################
FROM debian:bullseye-slim
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
@@ -200,6 +175,8 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# libreadline8 for psql
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libproj and libprotobuf-c1 for PostGIS
# GLIBC 2.34 for plv8.
# Debian bullseye provides GLIBC 2.31, so we install the library from testing
#
# Lastly, link compute_ctl into zenith_ctl while we're at it,
# so that we don't need to put this in another layer.
@@ -212,6 +189,12 @@ RUN apt update && \
libproj19 \
libprotobuf-c1 && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
echo "Installing GLIBC 2.34" && \
echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
apt install -y --no-install-recommends -t testing libc6 && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
ln /usr/local/bin/compute_ctl /usr/local/bin/zenith_ctl
USER postgres

View File

@@ -4,23 +4,26 @@
#
ARG TAG=pinned
# apparently, ARGs don't get replaced in RUN commands in kaniko
# ARG POSTGIS_VERSION=3.3.1
# ARG PLV8_VERSION=3.1.4
# ARG PG_VERSION=v15
#########################################################################################
#
# Layer "build-deps"
#
#########################################################################################
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
#########################################################################################
#
# Layer "pg-build"
# Build Postgres from the neon postgres repository.
#
#########################################################################################
FROM build-deps AS pg-build
COPY vendor/postgres-v15 postgres
RUN cd postgres && \
@@ -31,12 +34,14 @@ RUN cd postgres && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install
#########################################################################################
#
# Layer "postgis-build"
# Build PostGIS from the upstream PostGIS mirror.
#
#########################################################################################
# PostGIS compiles against neon postgres sources without changes. Perhaps we
# could even use the upstream binaries, compiled against vanilla Postgres, but
# it would require some investigation to check that it works, and also keeps
# working in the future. So for now, we compile our own binaries.
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
@@ -57,29 +62,19 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_tiger_geocoder.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_topology.control
#########################################################################################
#
# Layer "plv8-build"
# Build plv8
#
#########################################################################################
FROM build-deps AS plv8-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5 binutils
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
# https://github.com/plv8/plv8/issues/475:
# v8 uses gold for linking and sets `--thread-count=4` which breaks
# gold version <= 1.35 (https://sourceware.org/bugzilla/show_bug.cgi?id=23607)
# Install newer gold version manually as debian-testing binutils version updates
# libc version, which in turn breaks other extension built against non-testing libc.
RUN wget https://ftp.gnu.org/gnu/binutils/binutils-2.38.tar.gz && \
tar xvzf binutils-2.38.tar.gz && \
cd binutils-2.38 && \
cd libiberty && ./configure && make -j $(getconf _NPROCESSORS_ONLN) && \
cd ../bfd && ./configure && make bfdver.h && \
cd ../gold && ./configure && make -j $(getconf _NPROCESSORS_ONLN) && make install && \
cp /usr/local/bin/ld.gold /usr/bin/gold
# https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
# Sed is used to patch for https://github.com/plv8/plv8/issues/503
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -87,25 +82,21 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
cd plv8-3.1.4 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
sed -i 's/MemoryContextAlloc(/MemoryContextAllocZero(/' plv8.cc && \
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#########################################################################################
#
# Layer "h3-pg-build"
# Build h3_pg
#
#########################################################################################
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN wget https://github.com/Kitware/CMake/releases/download/v3.24.2/cmake-3.24.2-linux-x86_64.sh \
-q -O /tmp/cmake-install.sh \
&& chmod u+x /tmp/cmake-install.sh \
&& /tmp/cmake-install.sh --skip-license --prefix=/usr/local/ \
&& rm /tmp/cmake-install.sh
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
@@ -124,15 +115,12 @@ RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3_postgis.control
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
#########################################################################################
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -145,22 +133,16 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon \
-s install
#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` binary
#
#########################################################################################
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:$TAG AS compute-tools
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cd compute_tools && cargo build --locked --profile release-line-debug-size-lto
#########################################################################################
#
# Clean up postgres folder before inclusion
#
#########################################################################################
FROM neon-pg-ext-build AS postgres-cleanup-layer
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
@@ -178,12 +160,10 @@ RUN rm -r /usr/local/pgsql/lib/pgxs/src
# if they were to be used by other libraries.
RUN rm /usr/local/pgsql/lib/lib*.a
#########################################################################################
#
# Final layer
# Put it all together into the final image
#
#########################################################################################
FROM debian:bullseye-slim
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
@@ -200,6 +180,8 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# libreadline8 for psql
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libproj and libprotobuf-c1 for PostGIS
# GLIBC 2.34 for plv8.
# Debian bullseye provides GLIBC 2.31, so we install the library from testing
#
# Lastly, link compute_ctl into zenith_ctl while we're at it,
# so that we don't need to put this in another layer.
@@ -212,6 +194,12 @@ RUN apt update && \
libproj19 \
libprotobuf-c1 && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
echo "Installing GLIBC 2.34" && \
echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
apt install -y --no-install-recommends -t testing libc6 && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
ln /usr/local/bin/compute_ctl /usr/local/bin/zenith_ctl
USER postgres

View File

@@ -424,29 +424,8 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
db_client.simple_query(&alter_query)?;
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
// This is needed because since postgres 15 this privilege is removed by default.
let grant_query = "DO $$\n\
BEGIN\n\
IF EXISTS(\n\
SELECT nspname\n\
FROM pg_catalog.pg_namespace\n\
WHERE nspname = 'public'\n\
) AND\n\
current_setting('server_version_num')::int/10000 >= 15\n\
THEN\n\
IF EXISTS(\n\
SELECT rolname\n\
FROM pg_catalog.pg_roles\n\
WHERE rolname = 'web_access'\n\
)\n\
THEN\n\
GRANT CREATE ON SCHEMA public TO web_access;\n\
END IF;\n\
END IF;\n\
END\n\
$$;"
.to_string();
// This is needed since postgres 15, where this privilege is removed by default.
let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string();
info!("grant query for db {} : {}", &db.name, &grant_query);
db_client.simple_query(&grant_query)?;
}

View File

@@ -183,18 +183,18 @@ impl PostgresNode {
}
fn sync_safekeepers(&self, auth_token: &Option<String>, pg_version: u32) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir(pg_version)?.join("postgres");
let pg_path = self.env.pg_bin_dir(pg_version).join("postgres");
let mut cmd = Command::new(&pg_path);
cmd.arg("--sync-safekeepers")
.env_clear()
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env("PGDATA", self.pgdata().to_str().unwrap())
.stdout(Stdio::piped())
@@ -282,7 +282,9 @@ impl PostgresNode {
fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
conf.append("wal_log_hints", "off");
// wal_log_hints is mandatory when running against pageserver (see gh issue#192)
// TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE?
conf.append("wal_log_hints", "on");
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");
@@ -420,7 +422,7 @@ impl PostgresNode {
}
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version).join("pg_ctl");
let mut cmd = Command::new(pg_ctl_path);
cmd.args(
[
@@ -438,11 +440,11 @@ impl PostgresNode {
.env_clear()
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
);
if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token);

View File

@@ -52,10 +52,6 @@ pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
// size smaller. Our test etcd clusters are very small.
// See https://github.com/etcd-io/etcd/issues/7910
"--quota-backend-bytes=100000000".to_string(),
// etcd doesn't compact (vacuum) with default settings,
// enable it to prevent space exhaustion.
"--auto-compaction-mode=revision".to_string(),
"--auto-compaction-retention=1".to_string(),
])
.stdout(Stdio::from(etcd_stdout_file))
.stderr(Stdio::from(etcd_stderr_file))

View File

@@ -201,28 +201,28 @@ impl LocalEnv {
self.pg_distrib_dir.clone()
}
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match pg_version {
14 => Ok(path.join(format!("v{pg_version}"))),
15 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
@@ -422,10 +422,10 @@ impl LocalEnv {
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
if !self.pg_bin_dir(pg_version).join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
self.pg_bin_dir(pg_version)?.display()
self.pg_bin_dir(pg_version).display()
);
}
for binary in ["pageserver", "safekeeper"] {

View File

@@ -123,6 +123,7 @@ impl SafekeeperNode {
.args(&["--id", self.id.to_string().as_ref()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize"),
);
if !self.conf.sync {

View File

@@ -1,48 +0,0 @@
#!/bin/bash
set -eux
PG_VERSION=${PG_VERSION:-14}
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
SPEC_FILE=/tmp/spec.json
echo "Waiting pageserver become ready."
while ! nc -z pageserver 6400; do
sleep 1;
done
echo "Page server is ready."
echo "Create a tenant and timeline"
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{}"
http://pageserver:9898/v1/tenant/
)
tenant_id=$(curl "${PARAMS[@]}" | sed 's/"//g')
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{\"tenant_id\":\"${tenant_id}\", \"pg_version\": ${PG_VERSION}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
echo "Overwrite tenant id and timeline id in spec file"
tenant_id=$(echo ${result} | jq -r .tenant_id)
timeline_id=$(echo ${result} | jq -r .timeline_id)
sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}
cat ${SPEC_FILE}
echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
-S ${SPEC_FILE}

View File

@@ -1,141 +0,0 @@
{
"format_version": 1.0,
"timestamp": "2022-10-12T18:00:00.000Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
"cluster": {
"cluster_id": "docker_compose",
"name": "docker_compose_test",
"state": "restarted",
"roles": [
{
"name": "cloud_admin",
"encrypted_password": "b093c0d3b281ba6da1eacc608620abd8",
"options": null
}
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "port",
"value": "55433",
"vartype": "integer"
},
{
"name": "shared_buffers",
"value": "1MB",
"vartype": "string"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "wal_sender_timeout",
"value": "5s",
"vartype": "string"
},
{
"name": "wal_keep_size",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "restart_after_crash",
"value": "off",
"vartype": "bool"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "neon.safekeepers",
"value": "safekeeper1:5454,safekeeper2:5454,safekeeper3:5454",
"vartype": "string"
},
{
"name": "neon.timeline_id",
"value": "TIMELINE_ID",
"vartype": "string"
},
{
"name": "neon.tenant_id",
"value": "TENANT_ID",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": "host=pageserver port=6400",
"vartype": "string"
},
{
"name": "max_replication_write_lag",
"value": "500MB",
"vartype": "string"
},
{
"name": "max_replication_flush_lag",
"value": "10GB",
"vartype": "string"
}
]
},
"delta_operations": [
]
}

View File

@@ -1,200 +0,0 @@
version: '3'
services:
etcd:
image: quay.io/coreos/etcd:v3.5.4
ports:
- 2379:2379
- 2380:2380
environment:
# This signifficantly speeds up etcd and we anyway don't data persistency there.
ETCD_UNSAFE_NO_FSYNC: "1"
command:
- "etcd"
- "--auto-compaction-mode=revision"
- "--auto-compaction-retention=1"
- "--name=etcd-cluster"
- "--initial-cluster-state=new"
- "--initial-cluster-token=etcd-cluster-1"
- "--initial-cluster=etcd-cluster=http://etcd:2380"
- "--initial-advertise-peer-urls=http://etcd:2380"
- "--advertise-client-urls=http://etcd:2379"
- "--listen-client-urls=http://0.0.0.0:2379"
- "--listen-peer-urls=http://0.0.0.0:2380"
- "--quota-backend-bytes=134217728" # 128 MB
minio:
image: quay.io/minio/minio:RELEASE.2022-10-20T00-55-09Z
ports:
- 9000:9000
- 9001:9001
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=password
command: server /data --address :9000 --console-address ":9001"
minio_create_buckets:
image: minio/mc
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=password
entrypoint:
- "/bin/sh"
- "-c"
command:
- "until (/usr/bin/mc alias set minio http://minio:9000 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD) do
echo 'Waiting to start minio...' && sleep 1;
done;
/usr/bin/mc mb minio/neon --region=eu-north-1;
exit 0;"
depends_on:
- minio
pageserver:
image: neondatabase/neon:${TAG:-latest}
environment:
- BROKER_ENDPOINT='http://etcd:2379'
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 6400:6400 # pg protocol handler
- 9898:9898 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "/usr/local/bin/pageserver -D /data/.neon/
-c \"broker_endpoints=[$$BROKER_ENDPOINT]\"
-c \"listen_pg_addr='0.0.0.0:6400'\"
-c \"listen_http_addr='0.0.0.0:9898'\"
-c \"remote_storage={endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/pageserver/'}\""
depends_on:
- etcd
- minio_create_buckets
safekeeper1:
image: neondatabase/neon:${TAG:-latest}
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper1:5454
- SAFEKEEPER_ID=1
- BROKER_ENDPOINT=http://etcd:2379
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 5454:5454 # pg protocol handler
- 7676:7676 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoints=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- etcd
- minio_create_buckets
safekeeper2:
image: neondatabase/neon:${TAG:-latest}
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper2:5454
- SAFEKEEPER_ID=2
- BROKER_ENDPOINT=http://etcd:2379
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 5454:5454 # pg protocol handler
- 7677:7676 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoints=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- etcd
- minio_create_buckets
safekeeper3:
image: neondatabase/neon:${TAG:-latest}
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper3:5454
- SAFEKEEPER_ID=3
- BROKER_ENDPOINT=http://etcd:2379
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 5454:5454 # pg protocol handler
- 7678:7676 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoints=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- etcd
- minio_create_buckets
compute:
build:
context: ./image/compute
args:
- COMPUTE_IMAGE=compute-node-v${PG_VERSION:-14}:${TAG:-latest}
- http_proxy=$http_proxy
- https_proxy=$https_proxy
environment:
- PG_VERSION=${PG_VERSION:-14}
#- RUST_BACKTRACE=1
volumes:
- ./compute/var/db/postgres/specs/:/var/db/postgres/specs/
- ./compute/shell/:/shell/
ports:
- 55433:55433 # pg protocol handler
- 3080:3080 # http endpoints
entrypoint:
- "/shell/compute.sh"
depends_on:
- safekeeper1
- safekeeper2
- safekeeper3
- pageserver
compute_is_ready:
image: postgres:latest
entrypoint:
- "/bin/bash"
- "-c"
command:
- "until pg_isready -h compute -p 55433 ; do
echo 'Waiting to start compute...' && sleep 1;
done"
depends_on:
- compute

View File

@@ -1,10 +0,0 @@
ARG COMPUTE_IMAGE=compute-node-v14:latest
FROM neondatabase/${COMPUTE_IMAGE}
USER root
RUN apt-get update && \
apt-get install -y curl \
jq \
netcat
USER postgres

View File

@@ -18,67 +18,3 @@ We build all images after a successful `release` tests run and push automaticall
1. `neondatabase/compute-tools` and `neondatabase/compute-node`
2. `neondatabase/neon`
## Docker Compose example
You can see a [docker compose](https://docs.docker.com/compose/) example to create a neon cluster in [/docker-compose/docker-compose.yml](/docker-compose/docker-compose.yml). It creates the following conatainers.
- etcd x 1
- pageserver x 1
- safekeeper x 3
- compute x 1
- MinIO x 1 # This is Amazon S3 compatible object storage
### How to use
1. create containers
You can specify version of neon cluster using following environment values.
- PG_VERSION: postgres version for compute (default is 14)
- TAG: the tag version of [docker image](https://registry.hub.docker.com/r/neondatabase/neon/tags) (default is latest), which is tagged in [CI test](/.github/workflows/build_and_test.yml)
```
$ cd docker-compose/docker-compose.yml
$ docker-compose down # remove the conainers if exists
$ PG_VERSION=15 TAG=2221 docker-compose up --build -d # You can specify the postgres and image version
Creating network "dockercompose_default" with the default driver
Creating dockercompose_etcd3_1 ...
(...omit...)
```
2. connect compute node
```
$ echo "localhost:55433:postgres:cloud_admin:cloud_admin" >> ~/.pgpass
$ psql -h localhost -p 55433 -U cloud_admin
postgres=# CREATE TABLE t(key int primary key, value text);
CREATE TABLE
postgres=# insert into t values(1,1);
INSERT 0 1
postgres=# select * from t;
key | value
-----+-------
1 | 1
(1 row)
```
3. If you want to see the log, you can use `docker-compose logs` command.
```
# check the container name you want to see
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d6968a5ae912 dockercompose_compute "/shell/compute.sh" 5 minutes ago Up 5 minutes 0.0.0.0:3080->3080/tcp, 0.0.0.0:55433->55433/tcp dockercompose_compute_1
(...omit...)
$ docker logs -f dockercompose_compute_1
2022-10-21 06:15:48.757 GMT [56] LOG: connection authorized: user=cloud_admin database=postgres application_name=psql
2022-10-21 06:17:00.307 GMT [56] LOG: [NEON_SMGR] libpagestore: connected to 'host=pageserver port=6400'
(...omit...)
```
4. If you want to see durable data in MinIO which is s3 compatible storage
Access http://localhost:9001 and sign in.
- Username: `minio`
- Password: `password`
You can see durable pages and WAL data in `neon` bucket.

View File

@@ -29,9 +29,6 @@ pub struct SkTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub local_start_lsn: Option<Lsn>,
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,

View File

@@ -7,9 +7,6 @@ edition = "2021"
serde = { version = "1.0", features = ["derive"] }
serde_with = "2.0"
const_format = "0.2.21"
anyhow = { version = "1.0", features = ["backtrace"] }
bytes = "1.0.1"
utils = { path = "../utils" }
postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -2,7 +2,6 @@ use const_format::formatcp;
/// Public API types
pub mod models;
pub mod reltag;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -7,10 +7,6 @@ use utils::{
lsn::Lsn,
};
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
@@ -23,22 +19,6 @@ pub enum TenantState {
Broken,
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
/// Timeline is fully operational, its background jobs are running.
Active,
/// A timeline is recognized by pageserver, but not yet ready to operate.
/// The status indicates, that the timeline could eventually go back to Active automatically:
/// for example, if the owning tenant goes back to Active again.
Suspended,
/// A timeline is recognized by pageserver, but not yet ready to operate and not allowed to
/// automatically become Active after certain events: only a management call can change this status.
Paused,
/// A timeline is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
@@ -180,8 +160,6 @@ pub struct TimelineInfo {
pub remote_consistent_lsn: Option<Lsn>,
pub awaits_download: bool,
pub state: TimelineState,
// Some of the above fields are duplicated in 'local' and 'remote', for backwards-
// compatility with older clients.
pub local: LocalTimelineInfo,
@@ -224,159 +202,12 @@ pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
// Wrapped in libpq CopyData
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantSetBackgroundActivityRequest {
pub run_backround_jobs: bool,
}
// Wrapped in libpq CopyData
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
}
#[derive(Debug)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamGetPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub lsn: Lsn,
pub dbnode: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
pub n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
pub db_size: i64,
}
impl PagestreamFeMessage {
pub fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
impl PagestreamBeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(resp) => {
bytes.put_u8(100); /* tag from pagestore_client.h */
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(101); /* tag from pagestore_client.h */
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(103); /* tag from pagestore_client.h */
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
}
bytes.into()
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantSetBackgroundActivityResponse {
pub msg: String,
}

View File

@@ -37,22 +37,22 @@ pub static REQUIRED_POSTGRES_CONFIG: Lazy<Vec<&'static str>> = Lazy::new(|| {
});
impl Conf {
pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
pub fn pg_distrib_dir(&self) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match self.pg_version {
14 => Ok(path.join(format!("v{}", self.pg_version))),
15 => Ok(path.join(format!("v{}", self.pg_version))),
_ => bail!("Unsupported postgres version: {}", self.pg_version),
14 => path.join(format!("v{}", self.pg_version)),
15 => path.join(format!("v{}", self.pg_version)),
_ => panic!("Unsupported postgres version: {}", self.pg_version),
}
}
fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir()?.join("bin"))
fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir().join("bin")
}
fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir()?.join("lib"))
fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir().join("lib")
}
pub fn wal_dir(&self) -> PathBuf {
@@ -60,12 +60,12 @@ impl Conf {
}
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
let path = self.pg_bin_dir()?.join(command);
let path = self.pg_bin_dir().join(command);
ensure!(path.exists(), "Command {:?} does not exist", path);
let mut cmd = Command::new(path);
cmd.env_clear()
.env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
.env("LD_LIBRARY_PATH", self.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir());
Ok(cmd)
}

View File

@@ -19,7 +19,7 @@ thiserror = "1.0"
tokio = { version = "1.17", features = ["macros"]}
tokio-rustls = "0.23"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
nix = "0.25"
signal-hook = "0.3.10"
rand = "0.8.3"
@@ -30,8 +30,6 @@ rustls-split = "0.3.0"
git-version = "0.3.5"
serde_with = "2.0"
once_cell = "1.13.0"
strum = "0.24"
strum_macros = "0.24"
metrics = { path = "../metrics" }

View File

@@ -75,12 +75,6 @@ impl From<[u8; 16]> for Id {
}
}
impl From<Id> for u128 {
fn from(id: Id) -> Self {
u128::from_le_bytes(id.0)
}
}
impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.hex_encode())
@@ -142,12 +136,6 @@ macro_rules! id_newtype {
}
}
impl From<$t> for u128 {
fn from(id: $t) -> Self {
u128::from(id.0)
}
}
impl fmt::Display for $t {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)

View File

@@ -1,35 +1,11 @@
use std::{
fs::{File, OpenOptions},
path::Path,
str::FromStr,
};
use anyhow::{Context, Result};
use strum_macros::{EnumString, EnumVariantNames};
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
Plain,
Json,
}
impl LogFormat {
pub fn from_config(s: &str) -> anyhow::Result<LogFormat> {
use strum::VariantNames;
LogFormat::from_str(s).with_context(|| {
format!(
"Unrecognized log format. Please specify one of: {:?}",
LogFormat::VARIANTS
)
})
}
}
pub fn init(
log_filename: impl AsRef<Path>,
daemonize: bool,
log_format: LogFormat,
) -> Result<File> {
pub fn init(log_filename: impl AsRef<Path>, daemonize: bool) -> Result<File> {
// Don't open the same file for output multiple times;
// the different fds could overwrite each other's output.
let log_file = OpenOptions::new()
@@ -45,50 +21,22 @@ pub fn init(
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
let x: File = log_file.try_clone().unwrap();
let base_logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.with_ansi(false)
.with_writer(move || -> Box<dyn std::io::Write> {
// we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it
// if we do not use daemonization (e.g. in docker) it is better to log to stdout directly
// for example to be in line with docker log command which expects logs comimg from stdout
if daemonize {
Box::new(x.try_clone().unwrap())
} else {
Box::new(std::io::stdout())
}
});
.with_target(false) // don't include event targets
.with_ansi(false); // don't use colors in log file;
match log_format {
LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(),
// we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it
// if we do not use daemonization (e.g. in docker) it is better to log to stdout directly
// for example to be in line with docker log command which expects logs comimg from stdout
if daemonize {
let x = log_file.try_clone().unwrap();
base_logger
.with_writer(move || x.try_clone().unwrap())
.init();
} else {
base_logger.init();
}
Ok(log_file)
}
// #[cfg(test)]
// Due to global logger, can't run tests in same process.
// So until there's a non-global one, the tests are in ../tests/ as separate files.
#[macro_export(local_inner_macros)]
macro_rules! test_init_file_logger {
($log_level:expr, $log_format:expr) => {{
use std::str::FromStr;
std::env::set_var("RUST_LOG", $log_level);
let tmp_dir = tempfile::TempDir::new().unwrap();
let log_file_path = tmp_dir.path().join("logfile");
let log_format = $crate::logging::LogFormat::from_str($log_format).unwrap();
let _log_file = $crate::logging::init(&log_file_path, true, log_format).unwrap();
let log_file = std::fs::OpenOptions::new()
.read(true)
.open(&log_file_path)
.unwrap();
log_file
}};
}

View File

@@ -1,36 +0,0 @@
// This could be in ../src/logging.rs but since the logger is global, these
// can't be run in threads of the same process
use std::fs::File;
use std::io::{BufRead, BufReader, Lines};
use tracing::*;
use utils::test_init_file_logger;
fn read_lines(file: File) -> Lines<BufReader<File>> {
BufReader::new(file).lines()
}
#[test]
fn test_json_format_has_message_and_custom_field() {
std::env::set_var("RUST_LOG", "info");
let log_file = test_init_file_logger!("info", "json");
let custom_field: &str = "hi";
trace!(custom = %custom_field, "test log message");
debug!(custom = %custom_field, "test log message");
info!(custom = %custom_field, "test log message");
warn!(custom = %custom_field, "test log message");
error!(custom = %custom_field, "test log message");
let lines = read_lines(log_file);
for line in lines {
let content = line.unwrap();
let json_object = serde_json::from_str::<serde_json::Value>(&content).unwrap();
assert_eq!(json_object["fields"]["custom"], "hi");
assert_eq!(json_object["fields"]["message"], "test log message");
assert_ne!(json_object["level"], "TRACE");
assert_ne!(json_object["level"], "DEBUG");
}
}

View File

@@ -1,36 +0,0 @@
// This could be in ../src/logging.rs but since the logger is global, these
// can't be run in threads of the same process
use std::fs::File;
use std::io::{BufRead, BufReader, Lines};
use tracing::*;
use utils::test_init_file_logger;
fn read_lines(file: File) -> Lines<BufReader<File>> {
BufReader::new(file).lines()
}
#[test]
fn test_plain_format_has_message_and_custom_field() {
std::env::set_var("RUST_LOG", "warn");
let log_file = test_init_file_logger!("warn", "plain");
let custom_field: &str = "hi";
trace!(custom = %custom_field, "test log message");
debug!(custom = %custom_field, "test log message");
info!(custom = %custom_field, "test log message");
warn!(custom = %custom_field, "test log message");
error!(custom = %custom_field, "test log message");
let lines = read_lines(log_file);
for line in lines {
let content = line.unwrap();
serde_json::from_str::<serde_json::Value>(&content).unwrap_err();
assert!(content.contains("custom=hi"));
assert!(content.contains("test log message"));
assert!(!content.contains("TRACE"));
assert!(!content.contains("DEBUG"));
assert!(!content.contains("INFO"));
}
}

View File

@@ -67,7 +67,6 @@ remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
criterion = "0.4"

View File

@@ -22,8 +22,8 @@ use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};

View File

@@ -1,150 +0,0 @@
//! A tool for visualizing the arrangement of layerfiles within a timeline.
//!
//! It reads filenames from stdin and prints a svg on stdout. The image is a plot in
//! page-lsn space, where every delta layer is a rectangle and every image layer is a
//! thick line. Legend:
//! - The x axis (left to right) represents page index.
//! - The y axis represents LSN, growing upwards.
//!
//! Coordinates in both axis are compressed for better readability.
//! (see https://medium.com/algorithms-digest/coordinate-compression-2fff95326fb)
//!
//! Example use:
//! ```
//! $ cd test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE
//! $ ls | grep "__" | cargo run --release --bin draw_timeline_dir > out.svg
//! $ firefox out.svg
//! ```
//!
//! This API was chosen so that we can easily work with filenames extracted from ssh,
//! or from pageserver log files.
//!
//! TODO Consider shipping this as a grafana panel plugin:
//! https://grafana.com/tutorials/build-a-panel-plugin/
use anyhow::Result;
use pageserver::repository::Key;
use std::cmp::Ordering;
use std::io::{self, BufRead};
use std::{
collections::{BTreeMap, BTreeSet},
ops::Range,
};
use svg_fmt::{rectangle, rgb, BeginSvg, EndSvg, Fill, Stroke};
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
// Map values to their compressed coordinate - the index the value
// would have in a sorted and deduplicated list of all values.
fn build_coordinate_compression_map<T: Ord + Copy>(coords: Vec<T>) -> BTreeMap<T, usize> {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
map
}
fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
let split: Vec<&str> = name.split("__").collect();
let keys: Vec<&str> = split[0].split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
if lsns.len() == 1 {
lsns.push(lsns[0]);
}
let keys = Key::from_hex(keys[0]).unwrap()..Key::from_hex(keys[1]).unwrap();
let lsns = Lsn::from_hex(lsns[0]).unwrap()..Lsn::from_hex(lsns[1]).unwrap();
(keys, lsns)
}
fn main() -> Result<()> {
// Parse layer filenames from stdin
let mut ranges: Vec<(Range<Key>, Range<Lsn>)> = vec![];
let stdin = io::stdin();
for line in stdin.lock().lines() {
let range = parse_filename(&line.unwrap());
ranges.push(range);
}
// Collect all coordinates
let mut keys: Vec<Key> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for (keyr, lsnr) in &ranges {
keys.push(keyr.start);
keys.push(keyr.end);
lsns.push(lsnr.start);
lsns.push(lsnr.end);
}
// Analyze
let key_map = build_coordinate_compression_map(keys);
let lsn_map = build_coordinate_compression_map(lsns);
// Initialize stats
let mut num_deltas = 0;
let mut num_images = 0;
// Draw
let stretch = 3.0; // Stretch out vertically for better visibility
println!(
"{}",
BeginSvg {
w: key_map.len() as f32,
h: stretch * lsn_map.len() as f32
}
);
for (keyr, lsnr) in &ranges {
let key_start = *key_map.get(&keyr.start).unwrap();
let key_end = *key_map.get(&keyr.end).unwrap();
let key_diff = key_end - key_start;
let lsn_max = lsn_map.len();
if key_start >= key_end {
panic!("Invalid key range {}-{}", key_start, key_end);
}
let lsn_start = *lsn_map.get(&lsnr.start).unwrap();
let lsn_end = *lsn_map.get(&lsnr.end).unwrap();
let mut lsn_diff = (lsn_end - lsn_start) as f32;
let mut fill = Fill::None;
let mut margin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
let mut lsn_offset = 0.0;
// Fill in and thicken rectangle if it's an
// image layer so that we can see it.
match lsn_start.cmp(&lsn_end) {
Ordering::Less => num_deltas += 1,
Ordering::Equal => {
num_images += 1;
lsn_diff = 0.3;
lsn_offset = -lsn_diff / 2.0;
margin = 0.05;
fill = Fill::Color(rgb(0, 0, 0));
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
}
println!(
" {}",
rectangle(
key_start as f32 + stretch * margin,
stretch * (lsn_max as f32 - (lsn_end as f32 - margin - lsn_offset)),
key_diff as f32 - stretch * 2.0 * margin,
stretch * (lsn_diff - 2.0 * margin)
)
.fill(fill)
.stroke(Stroke::Color(rgb(0, 0, 0), 0.1))
.border_radius(0.4)
);
}
println!("{}", EndSvg);
eprintln!("num_images: {}", num_images);
eprintln!("num_deltas: {}", num_deltas);
Ok(())
}

View File

@@ -14,7 +14,7 @@ use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_image_cache, page_service, profiling, task_mgr,
http, page_cache, page_service, profiling, task_mgr,
task_mgr::TaskKind,
task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
@@ -101,7 +101,6 @@ fn main() -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
page_image_cache::init(64 * conf.page_cache_size); // temporary hack for benchmarking
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
@@ -200,7 +199,7 @@ fn initialize_config(
fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> {
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, daemonize, conf.log_format)?;
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
info!("version: {}", version());

View File

@@ -17,7 +17,6 @@ use toml_edit::{Document, Item};
use url::Url;
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::LogFormat,
postgres_backend::AuthType,
};
@@ -46,8 +45,6 @@ pub mod defaults {
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
pub const DEFAULT_LOG_FORMAT: &str = "plain";
///
/// Default built-in configuration file.
///
@@ -66,7 +63,6 @@ pub mod defaults {
# initial superuser role name to use when creating a new tenant
#initial_superuser_name = '{DEFAULT_SUPERUSER}'
#log_format = '{DEFAULT_LOG_FORMAT}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -130,8 +126,6 @@ pub struct PageServerConf {
/// Etcd broker endpoints to connect to.
pub broker_endpoints: Vec<Url>,
pub log_format: LogFormat,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -198,8 +192,6 @@ struct PageServerConfigBuilder {
profiling: BuilderValue<ProfilingConfig>,
broker_etcd_prefix: BuilderValue<String>,
broker_endpoints: BuilderValue<Vec<Url>>,
log_format: BuilderValue<LogFormat>,
}
impl Default for PageServerConfigBuilder {
@@ -227,7 +219,6 @@ impl Default for PageServerConfigBuilder {
profiling: Set(ProfilingConfig::Disabled),
broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()),
broker_endpoints: Set(Vec::new()),
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
}
}
}
@@ -300,10 +291,6 @@ impl PageServerConfigBuilder {
self.profiling = BuilderValue::Set(profiling)
}
pub fn log_format(&mut self, log_format: LogFormat) {
self.log_format = BuilderValue::Set(log_format)
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let broker_endpoints = self
.broker_endpoints
@@ -348,7 +335,6 @@ impl PageServerConfigBuilder {
broker_etcd_prefix: self
.broker_etcd_prefix
.ok_or(anyhow!("missing broker_etcd_prefix"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
})
}
}
@@ -401,28 +387,28 @@ impl PageServerConf {
//
// Postgres distribution paths
//
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match pg_version {
14 => Ok(path.join(format!("v{pg_version}"))),
15 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
@@ -473,9 +459,6 @@ impl PageServerConf {
})
.collect::<anyhow::Result<_>>()?,
),
"log_format" => builder.log_format(
LogFormat::from_config(&parse_toml_string(key, item)?)?
),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -588,7 +571,6 @@ impl PageServerConf {
default_tenant_conf: TenantConf::dummy_conf(),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
}
}
}
@@ -683,8 +665,6 @@ max_file_descriptors = 333
initial_superuser_name = 'zzzz'
id = 10
log_format = 'json'
"#;
#[test]
@@ -724,7 +704,6 @@ log_format = 'json'
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -769,7 +748,6 @@ log_format = 'json'
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::Json,
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -618,7 +618,6 @@ components:
- last_record_lsn
- disk_consistent_lsn
- awaits_download
- state
properties:
timeline_id:
type: string
@@ -661,8 +660,6 @@ components:
type: integer
awaits_download:
type: boolean
state:
type: string
# These 'local' and 'remote' fields just duplicate some of the fields
# above. They are kept for backwards-compatibility. They can be removed,

View File

@@ -3,6 +3,9 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use pageserver_api::models::{
TenantSetBackgroundActivityRequest, TenantSetBackgroundActivityResponse,
};
use remote_storage::GenericRemoteStorage;
use tokio::task::JoinError;
use tracing::*;
@@ -13,11 +16,12 @@ use super::models::{
TimelineCreateRequest,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::task_mgr::TaskKind;
use crate::tenant::{TenantState, Timeline};
use crate::tenant_config::TenantConfOpt;
use crate::{config::PageServerConf, tenant_mgr};
use crate::{storage_sync, task_mgr};
use utils::{
auth::JwtAuth,
http::{
@@ -129,7 +133,6 @@ async fn build_timeline_info(
}
};
let current_physical_size = Some(timeline.get_physical_size());
let state = timeline.current_state();
let info = TimelineInfo {
tenant_id: timeline.tenant_id,
@@ -159,7 +162,6 @@ async fn build_timeline_info(
remote_consistent_lsn,
awaits_download,
state,
// Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility
// with the control plane.
@@ -296,7 +298,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
let timeline_info = async {
let timeline = tokio::task::spawn_blocking(move || {
tenant_mgr::get_tenant(tenant_id, true)?.get_timeline(timeline_id, false)
tenant_mgr::get_tenant(tenant_id, true)?.get_timeline(timeline_id)
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
@@ -333,13 +335,14 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let timeline = tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
.and_then(|tenant| tenant.get_timeline(timeline_id))
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
let result = match timeline
.find_lsn_for_timestamp(timestamp_pg)
.map_err(ApiError::InternalServerError)?
{
LsnForTimestamp::Present(lsn) => format!("{lsn}"),
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
@@ -571,6 +574,63 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
)
}
async fn tenant_set_background_activity(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let request: TenantSetBackgroundActivityRequest = json_request(&mut request).await?;
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let modified = tenant.set_state_with(|old_state| {
let background_jobs_running = match old_state {
TenantState::Active {
background_jobs_running,
} => background_jobs_running,
_ => return None,
};
match (request.run_backround_jobs, background_jobs_running) {
(true, true) => None,
(false, false) => None,
(true, false) => Some(TenantState::Active {
background_jobs_running: true,
}),
(false, true) => {
// tasks will eventually shut down after that, but we need a guarantee
// that they've stopped so explicitly waiting for it
Some(TenantState::Active {
background_jobs_running: false,
})
}
}
});
if !modified {
return Ok(json_response(
StatusCode::NOT_MODIFIED,
TenantSetBackgroundActivityResponse { msg: "".to_owned() },
)?);
}
// state was modified and request values was set to false which means we changed state
// and now need to wait for tasks shutdown
// XXX can it be changed second time here? and modified flag be outdated now?
if modified && !request.run_backround_jobs {
task_mgr::shutdown_tasks(Some(TaskKind::Compaction), Some(tenant_id), None).await;
task_mgr::shutdown_tasks(Some(TaskKind::GarbageCollector), Some(tenant_id), None).await;
}
Ok(json_response(
StatusCode::OK,
TenantSetBackgroundActivityResponse {
msg: format!("run background jobs set to {}", request.run_backround_jobs),
},
)?)
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
@@ -789,16 +849,16 @@ async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body
check_permission(&request, Some(tenant_id))?;
// FIXME: currently this will return a 500 error on bad tenant id; it should be 4XX
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let repo = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| repo.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
let result = tenant
let pitr = repo.get_pitr_interval();
let result = repo
.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
@@ -813,9 +873,10 @@ async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Bod
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
let repo = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = repo
.get_timeline(timeline_id)
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
timeline.compact().map_err(ApiError::InternalServerError)?;
@@ -829,9 +890,10 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
let repo = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = repo
.get_timeline(timeline_id)
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
timeline
.checkpoint(CheckpointConfig::Forced)
@@ -903,6 +965,10 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
.post("/v1/tenant/:tenant_id/attach", tenant_attach_handler)
.post("/v1/tenant/:tenant_id/detach", tenant_detach_handler)
.post(
"/v1/tenant/:tenant_id/set_background_activity",
tenant_set_background_activity,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,

View File

@@ -12,10 +12,10 @@ use tracing::*;
use walkdir::WalkDir;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::WalStreamDecoder;

View File

@@ -5,10 +5,10 @@ pub mod import_datadir;
pub mod keyspace;
pub mod metrics;
pub mod page_cache;
pub mod page_image_cache;
pub mod page_service;
pub mod pgdatadir_mapping;
pub mod profiling;
pub mod reltag;
pub mod repository;
pub mod storage_sync;
pub mod task_mgr;
@@ -46,8 +46,6 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
pub const LOG_FILE_NAME: &str = "pageserver.log";
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
/// Config for the Repository checkpointer
#[derive(Debug, Clone, Copy)]
pub enum CheckpointConfig {

View File

@@ -108,10 +108,10 @@ enum CacheKey {
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct MaterializedPageHashKey {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key: Key,
struct MaterializedPageHashKey {
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
}
#[derive(Clone)]

View File

@@ -1,315 +0,0 @@
//!
//! Global page image cache
//!
//! Unlike page_cache it holds only most recent version of reconstructed page images.
//! And it uses invalidation mechanism to avoid layer ap lookups.
use crate::page_cache::MaterializedPageHashKey;
use crate::pgdatadir_mapping::{rel_block_to_key, BlockNumber};
use crate::repository::Key;
use crate::tenant::Timeline;
use anyhow::{bail, Result};
use bytes::Bytes;
use once_cell::sync::OnceCell;
use pageserver_api::reltag::RelTag;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Condvar, Mutex};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
static PAGE_CACHE: OnceCell<Mutex<PageImageCache>> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
enum PageImageState {
// entry is not used
Vacant,
// page is loaded or has failed
Loaded(Option<Bytes>),
// page in process of loading, Condvar is created on demand when some thread need to wait load completion
Loading(Option<Arc<Condvar>>),
}
impl std::fmt::Debug for PageImageState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Vacant => write!(f, "Vacant"),
Self::Loaded(arg0) => f
.debug_tuple("Loaded")
.field(&arg0.as_ref().map(|_| "<loaded>").unwrap_or("<failed>"))
.finish(),
Self::Loading(arg0) => f.debug_tuple("Loading").field(arg0).finish(),
}
}
}
struct CacheEntry {
key: MaterializedPageHashKey,
// next+prev are used for LRU L2-list and next is also used for L1 free pages list
next: usize,
prev: usize,
collision: usize, // L1 hash collision chain
state: PageImageState,
}
pub struct PageImageCache {
free_list: usize, // L1 list of free entries
pages: Vec<CacheEntry>,
hash_table: Vec<usize>, // indexes in pages array
}
///
/// Initialize the page cache. This must be called once at page server startup.
///
pub fn init(size: usize) {
if PAGE_CACHE
.set(Mutex::new(PageImageCache::new(size)))
.is_err()
{
panic!("page cache already initialized");
}
}
///
/// Get a handle to the page cache.
///
pub fn get() -> &'static Mutex<PageImageCache> {
//
// In unit tests, page server startup doesn't happen and no one calls
// page_image_cache::init(). Initialize it here with a tiny cache, so that the
// page cache is usable in unit tests.
//
if cfg!(test) {
PAGE_CACHE.get_or_init(|| Mutex::new(PageImageCache::new(TEST_PAGE_CACHE_SIZE)))
} else {
PAGE_CACHE.get().expect("page cache not initialized")
}
}
fn hash<T: Hash>(t: &T) -> usize {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish() as usize
}
impl PageImageCache {
fn new(size: usize) -> Self {
let mut pages: Vec<CacheEntry> = Vec::with_capacity(size + 1);
let hash_table = vec![0usize; size];
// Dummy key
let dummy_key = MaterializedPageHashKey {
key: Key::MIN,
tenant_id: TenantId::from([0u8; 16]),
timeline_id: TimelineId::from([0u8; 16]),
};
// LRU list head
pages.push(CacheEntry {
key: dummy_key.clone(),
next: 0,
prev: 0,
collision: 0,
state: PageImageState::Vacant,
});
// Construct L1 free page list
for i in 0..size {
pages.push(CacheEntry {
key: dummy_key.clone(),
next: i + 2, // build L1-list of free pages
prev: 0,
collision: 0,
state: PageImageState::Vacant,
});
}
pages[size - 1].next = 0; // en of free page list
PageImageCache {
free_list: 1,
pages,
hash_table,
}
}
// Unlink from L2-list
fn unlink(&mut self, index: usize) {
let next = self.pages[index].next;
let prev = self.pages[index].prev;
self.pages[next].prev = prev;
self.pages[prev].next = next;
}
// Link in L2-list after specified element
fn link_after(&mut self, after: usize, index: usize) {
let next = self.pages[after].next;
self.pages[index].prev = after;
self.pages[index].next = next;
self.pages[next].prev = index;
self.pages[after].next = index;
}
fn prune(&mut self, index: usize) {
self.pages[index].prev = index;
self.pages[index].next = index;
}
fn is_empty(&self, index: usize) -> bool {
self.pages[index].next == index
}
}
// Remove entry from cache: o page invalidation or drop relation
pub fn remove(key: Key, tenant_id: TenantId, timeline_id: TimelineId) {
let key = MaterializedPageHashKey {
key,
tenant_id,
timeline_id,
};
let this = get();
let mut cache = this.lock().unwrap();
let h = hash(&key) % cache.hash_table.len();
let mut index = cache.hash_table[h];
let mut prev = 0usize;
while index != 0 {
if cache.pages[index].key == key {
if !cache.is_empty(index) {
cache.pages[index].state = PageImageState::Vacant;
// Remove from LRU list
cache.unlink(index);
// Insert entry in free list
cache.pages[index].next = cache.free_list;
cache.free_list = index;
} else {
// Page is process of loading: we can not remove it righ now,
// so just mark for deletion
cache.pages[index].next = 0; // make is_empty == false
}
// Remove from hash table
if prev == 0 {
cache.hash_table[h] = cache.pages[index].collision;
} else {
cache.pages[prev].collision = cache.pages[index].collision;
}
break;
}
prev = index;
index = cache.pages[index].collision;
}
// It's Ok if image not found
}
// Find or load page image in the cache
pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) -> Result<Bytes> {
let key = MaterializedPageHashKey {
key: rel_block_to_key(rel, blkno),
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
};
let this = get();
let mut cache = this.lock().unwrap();
let h = hash(&key) % cache.hash_table.len();
'lookup: loop {
let mut index = cache.hash_table[h];
while index != 0 {
if cache.pages[index].key == key {
// cache hit
match &cache.pages[index].state {
PageImageState::Loaded(cached_page) => {
// Move to the head of LRU list
let page = cached_page.clone();
cache.unlink(index);
cache.link_after(0, index);
return page.ok_or(anyhow::anyhow!("page loading failed earlier"));
}
PageImageState::Loading(event) => {
// Create event on which to sleep if not yet assigned
let cv = match event {
None => {
let cv = Arc::new(Condvar::new());
cache.pages[index].state =
PageImageState::Loading(Some(cv.clone()));
cv
}
Some(cv) => cv.clone(),
};
cache = cv.wait(cache).unwrap();
// Retry lookup
continue 'lookup;
}
PageImageState::Vacant => bail!("Vacant entry is not expected here"),
};
}
index = cache.pages[index].collision;
}
// Cache miss
index = cache.free_list;
if index == 0 {
// no free items
let victim = cache.pages[0].prev; // take least recently used element from the tail of LRU list
assert!(victim != 0);
// Remove victim from hash table
let h = hash(&cache.pages[victim].key) % cache.hash_table.len();
index = cache.hash_table[h];
let mut prev = 0usize;
while index != victim {
assert!(index != 0);
prev = index;
index = cache.pages[index].collision;
}
if prev == 0 {
cache.hash_table[h] = cache.pages[victim].collision;
} else {
cache.pages[prev].collision = cache.pages[victim].collision;
}
// and from LRU list
cache.unlink(victim);
index = victim;
} else {
// Use next free item
cache.free_list = cache.pages[index].next;
}
// Make is_empty(index) == true. If entry is removed in process of loaded,
// it will be updated so that !is_empty(index)
cache.prune(index);
// Insert in hash table
cache.pages[index].collision = cache.hash_table[h];
cache.hash_table[h] = index;
cache.pages[index].key = key;
cache.pages[index].state = PageImageState::Loading(None);
drop(cache); //release lock
// Load page
let res = timeline.get_rel_page_at_lsn(rel, blkno, lsn, true);
cache = this.lock().unwrap();
match &cache.pages[index].state {
PageImageState::Loading(Some(cv)) => cv.notify_all(),
PageImageState::Loading(None) => {}
other => bail!("expected Loading(_), found {other:?}"),
}
if cache.is_empty(index) {
// entry was not marked as deleted {
// Page is loaded
cache.pages[index].state = PageImageState::Loaded(res.as_ref().ok().cloned());
// Link the page to the head of LRU list
cache.link_after(0, index);
} else {
cache.pages[index].state = PageImageState::Vacant;
// Return page to free list
cache.pages[index].next = cache.free_list;
cache.free_list = index;
}
// only the first one gets the full error from `get_rel_page_at_lsn`
return res;
}
}

View File

@@ -10,15 +10,8 @@
//
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use std::io;
use std::net::TcpListener;
use std::str;
@@ -41,8 +34,8 @@ use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::page_image_cache;
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -52,6 +45,163 @@ use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
}
// Wrapped in libpq CopyData
enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
}
#[derive(Debug)]
struct PagestreamExistsRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamNblocksRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamGetPageRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
blkno: u32,
}
#[derive(Debug)]
struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
struct PagestreamExistsResponse {
exists: bool,
}
#[derive(Debug)]
struct PagestreamNblocksResponse {
n_blocks: u32,
}
#[derive(Debug)]
struct PagestreamGetPageResponse {
page: Bytes,
}
#[derive(Debug)]
struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
impl PagestreamBeMessage {
fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(resp) => {
bytes.put_u8(100); /* tag from pagestore_client.h */
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(101); /* tag from pagestore_client.h */
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(103); /* tag from pagestore_client.h */
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
}
bytes.into()
}
}
fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Bytes>> + '_ {
async_stream::try_stream! {
loop {
@@ -582,12 +732,8 @@ impl PageServerHandler {
// current profiling is based on a thread-local variable, so it doesn't work
// across awaits
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
let page = if req.latest {
page_image_cache::lookup(timeline, req.rel, req.blkno, lsn)
} else {
timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, false)
}?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
@@ -914,8 +1060,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
}
fn get_local_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> Result<Arc<Timeline>> {
tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id))
}
///

View File

@@ -7,12 +7,12 @@
//! Clarify that)
//!
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::*;
use crate::tenant::Timeline;
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};
@@ -1179,7 +1179,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,
@@ -1373,17 +1373,6 @@ fn is_rel_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}
pub fn is_rel_fsm_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
}
pub fn is_rel_vm_block_key(key: Key) -> bool {
key.field1 == 0x00
&& key.field4 != 0
&& key.field5 == VISIBILITYMAP_FORKNUM
&& key.field6 != 0xffffffff
}
pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> {
Ok(match key.field1 {
0x01 => {

View File

@@ -11,8 +11,7 @@
//! parent timeline, and the last LSN that has been written to disk.
//!
use anyhow::{bail, Context};
use pageserver_api::models::TimelineState;
use anyhow::{bail, ensure, Context, Result};
use tokio::sync::watch;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -26,6 +25,7 @@ use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::Write;
use std::num::NonZeroU64;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
@@ -190,9 +190,10 @@ impl UninitializedTimeline<'_> {
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
)
})?;
new_timeline.set_state(TimelineState::Active);
v.insert(Arc::clone(&new_timeline));
new_timeline.launch_wal_receiver();
new_timeline.launch_wal_receiver().with_context(|| {
format!("Failed to launch walreceiver for timeline {tenant_id}/{timeline_id}")
})?;
}
}
@@ -293,7 +294,7 @@ impl TimelineUninitMark {
Ok(())
}
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
fn delete_mark_file_if_present(&mut self) -> Result<(), anyhow::Error> {
let uninit_mark_file = &self.uninit_mark_path;
let uninit_mark_parent = uninit_mark_file
.parent()
@@ -340,26 +341,18 @@ impl Tenant {
/// Get Timeline handle for given Neon timeline ID.
/// This function is idempotent. It doesn't change internal state in any way.
pub fn get_timeline(
&self,
timeline_id: TimelineId,
active_only: bool,
) -> anyhow::Result<Arc<Timeline>> {
let timelines_accessor = self.timelines.lock().unwrap();
let timeline = timelines_accessor.get(&timeline_id).with_context(|| {
format!("Timeline {}/{} was not found", self.tenant_id, timeline_id)
})?;
if active_only && !timeline.is_active() {
anyhow::bail!(
"Timeline {}/{} is not active, state: {:?}",
self.tenant_id,
timeline_id,
timeline.current_state()
)
} else {
Ok(Arc::clone(timeline))
}
pub fn get_timeline(&self, timeline_id: TimelineId) -> anyhow::Result<Arc<Timeline>> {
self.timelines
.lock()
.unwrap()
.get(&timeline_id)
.with_context(|| {
format!(
"Timeline {} was not found for tenant {}",
timeline_id, self.tenant_id
)
})
.map(Arc::clone)
}
/// Lists timelines the tenant contains.
@@ -382,11 +375,6 @@ impl Tenant {
initdb_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<UninitializedTimeline> {
anyhow::ensure!(
self.is_active(),
"Cannot create empty timelines on inactive tenant"
);
let timelines = self.timelines.lock().unwrap();
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
drop(timelines);
@@ -423,14 +411,9 @@ impl Tenant {
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
) -> anyhow::Result<Option<Arc<Timeline>>> {
anyhow::ensure!(
self.is_active(),
"Cannot create timelines on inactive tenant"
);
let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate);
if self.get_timeline(new_timeline_id, false).is_ok() {
if self.get_timeline(new_timeline_id).is_ok() {
debug!("timeline {new_timeline_id} already exists");
return Ok(None);
}
@@ -438,7 +421,7 @@ impl Tenant {
let loaded_timeline = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = self
.get_timeline(ancestor_timeline_id, false)
.get_timeline(ancestor_timeline_id)
.context("Cannot branch off the timeline that's not present in pageserver")?;
if let Some(lsn) = ancestor_start_lsn.as_mut() {
@@ -489,12 +472,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> anyhow::Result<GcResult> {
anyhow::ensure!(
self.is_active(),
"Cannot run GC iteration on inactive tenant"
);
) -> Result<GcResult> {
let timeline_str = target_timeline_id
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
@@ -510,12 +488,7 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub fn compaction_iteration(&self) -> anyhow::Result<()> {
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
);
pub fn compaction_iteration(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -523,7 +496,6 @@ impl Tenant {
let timelines = self.timelines.lock().unwrap();
let timelines_to_compact = timelines
.iter()
.filter(|(_, timeline)| timeline.is_active())
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
.collect::<Vec<_>>();
drop(timelines);
@@ -540,19 +512,19 @@ impl Tenant {
///
/// Used at graceful shutdown.
///
pub fn checkpoint(&self) -> anyhow::Result<()> {
pub fn checkpoint(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_checkpoint = timelines
let timelines_to_compact = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline)))
.collect::<Vec<_>>();
drop(timelines);
for (timeline_id, timeline) in &timelines_to_checkpoint {
for (timeline_id, timeline) in &timelines_to_compact {
let _entered =
info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id)
.entered();
@@ -574,7 +546,7 @@ impl Tenant {
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
anyhow::ensure!(
ensure!(
!children_exist,
"Cannot delete timeline which has child timelines"
);
@@ -583,10 +555,7 @@ impl Tenant {
Entry::Vacant(_) => bail!("timeline not found"),
};
let timeline = timeline_entry.get();
timeline.set_state(TimelineState::Paused);
let layer_removal_guard = timeline.layer_removal_guard()?;
let layer_removal_guard = timeline_entry.get().layer_removal_guard()?;
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
@@ -603,6 +572,58 @@ impl Tenant {
Ok(())
}
pub fn init_attach_timelines(
&self,
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> anyhow::Result<()> {
let sorted_timelines = if timelines.len() == 1 {
timelines.into_iter().collect()
} else if !timelines.is_empty() {
tree_sort_timelines(timelines)?
} else {
warn!("No timelines to attach received");
return Ok(());
};
let mut timelines_accessor = self.timelines.lock().unwrap();
for (timeline_id, metadata) in sorted_timelines {
info!(
"Attaching timeline {} pg_version {}",
timeline_id,
metadata.pg_version()
);
if timelines_accessor.contains_key(&timeline_id) {
warn!(
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
self.tenant_id, timeline_id
);
continue;
} else {
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((
self.create_timeline_data(timeline_id, metadata, ancestor)
.with_context(|| {
format!("Failed to initialize timeline {timeline_id}")
})?,
TimelineUninitMark::dummy(),
)),
};
let initialized_timeline =
timeline.initialize_with_lock(&mut timelines_accessor, true)?;
timelines_accessor.insert(timeline_id, initialized_timeline);
}
}
Ok(())
}
/// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn.
pub fn get_remote_index(&self) -> &RemoteIndex {
&self.remote_index
@@ -634,42 +655,42 @@ impl Tenant {
}
pub fn set_state(&self, new_state: TenantState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
}
(TenantState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
}
(_, new_state) => {
self.state.send_replace(new_state);
self.set_state_with(|_| Some(new_state));
}
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
match new_state {
TenantState::Active {
background_jobs_running,
} => {
if background_jobs_running {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
}
pub fn set_state_with<F>(&self, f: F) -> bool
where
F: FnOnce(&mut TenantState) -> Option<TenantState>,
{
let modify = |old_state: &mut TenantState| {
let new_state = match f(old_state) {
None => return false,
Some(new_state) => new_state,
};
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Active);
}
}
TenantState::Paused | TenantState::Broken => {
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Suspended);
}
}
match (old_state, new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == &equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
false
}
(TenantState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
false
}
(old_state, new_state) => {
*old_state = new_state;
true
}
}
};
let modified = self.state.send_if_modified(modify);
if modified && self.should_run_tasks() {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
}
modified
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
@@ -682,7 +703,7 @@ impl Tenant {
/// before the children.
fn tree_sort_timelines(
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
) -> Result<Vec<(TimelineId, TimelineMetadata)>> {
let mut result = Vec::with_capacity(timelines.len());
let mut now = Vec::with_capacity(timelines.len());
@@ -785,6 +806,27 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
}
pub fn get_lagging_wal_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
}
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
@@ -816,7 +858,7 @@ impl Tenant {
))
}
pub(super) fn new(
pub fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
@@ -839,7 +881,7 @@ impl Tenant {
}
/// Locate and load config
pub(super) fn load_tenant_config(
pub fn load_tenant_config(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<TenantConfOpt> {
@@ -881,7 +923,7 @@ impl Tenant {
Ok(tenant_conf)
}
pub(super) fn persist_tenant_config(
pub fn persist_tenant_config(
target_config_path: &Path,
tenant_conf: TenantConfOpt,
first_save: bool,
@@ -974,7 +1016,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> anyhow::Result<GcResult> {
) -> Result<GcResult> {
let mut totals: GcResult = Default::default();
let now = Instant::now();
@@ -995,7 +1037,6 @@ impl Tenant {
timelines
.iter()
.filter(|(_, timeline)| timeline.is_active())
.map(|(timeline_id, timeline_entry)| {
// This is unresolved question for now, how to do gc in presence of remote timelines
// especially when this is combined with branching.
@@ -1029,7 +1070,7 @@ impl Tenant {
for timeline_id in timeline_ids {
// Timeline is known to be local and loaded.
let timeline = self
.get_timeline(timeline_id, false)
.get_timeline(timeline_id)
.with_context(|| format!("Timeline {timeline_id} was not found"))?;
// If target_timeline is specified, ignore all other timelines
@@ -1114,7 +1155,7 @@ impl Tenant {
// Step 2 is to avoid initializing the new branch using data removed by past GC iterations
// or in-queue GC iterations.
let src_timeline = self.get_timeline(src, false).with_context(|| {
let src_timeline = self.get_timeline(src).with_context(|| {
format!(
"No ancestor {} found for timeline {}/{}",
src, self.tenant_id, dst
@@ -1384,68 +1425,6 @@ impl Tenant {
Ok(uninit_mark)
}
pub(super) fn init_attach_timelines(
&self,
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> anyhow::Result<()> {
let sorted_timelines = if timelines.len() == 1 {
timelines.into_iter().collect()
} else if !timelines.is_empty() {
tree_sort_timelines(timelines)?
} else {
warn!("No timelines to attach received");
return Ok(());
};
let tenant_id = self.tenant_id;
let mut timelines_accessor = self.timelines.lock().unwrap();
for (timeline_id, metadata) in sorted_timelines {
info!(
"Attaching timeline {}/{} pg_version {}",
tenant_id,
timeline_id,
metadata.pg_version()
);
if timelines_accessor.contains_key(&timeline_id) {
warn!("Timeline {tenant_id}/{timeline_id} already exists in the tenant map, skipping its initialization");
continue;
}
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
let dummy_timeline = self
.create_timeline_data(timeline_id, metadata.clone(), ancestor.clone())
.with_context(|| {
format!("Failed to crate dummy timeline data for {tenant_id}/{timeline_id}")
})?;
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
};
match timeline.initialize_with_lock(&mut timelines_accessor, true) {
Ok(initialized_timeline) => {
timelines_accessor.insert(timeline_id, initialized_timeline);
}
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
let broken_timeline = self
.create_timeline_data(timeline_id, metadata, ancestor)
.with_context(|| {
format!("Failed to crate broken timeline data for {tenant_id}/{timeline_id}")
})?;
broken_timeline.set_state(TimelineState::Broken);
timelines_accessor.insert(timeline_id, Arc::new(broken_timeline));
}
}
}
Ok(())
}
}
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
@@ -1454,9 +1433,9 @@ fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Path,
pg_version: u32,
) -> anyhow::Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
) -> Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version).join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version);
info!(
"running {} in {}, libdir: {}",
initdb_bin_path.display(),
@@ -1500,7 +1479,7 @@ impl Drop for Tenant {
}
}
/// Dump contents of a layer file to stdout.
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
use std::os::unix::fs::FileExt;
// All layer files start with a two-byte "magic" value, to identify the kind of
@@ -1605,13 +1584,13 @@ pub mod harness {
}
impl<'a> TenantHarness<'a> {
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
pub fn create(test_name: &'static str) -> Result<Self> {
Self::create_internal(test_name, false)
}
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
pub fn create_exclusive(test_name: &'static str) -> Result<Self> {
Self::create_internal(test_name, true)
}
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
fn create_internal(test_name: &'static str, exclusive: bool) -> Result<Self> {
let lock_guard = if exclusive {
(None, Some(LOCK.write().unwrap()))
} else {
@@ -1645,7 +1624,7 @@ pub mod harness {
self.try_load().expect("failed to load test tenant")
}
pub fn try_load(&self) -> anyhow::Result<Tenant> {
pub fn try_load(&self) -> Result<Tenant> {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Tenant::new(
@@ -1673,9 +1652,6 @@ pub mod harness {
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant.init_attach_timelines(timelines_to_load)?;
tenant.set_state(TenantState::Active {
background_jobs_running: false,
});
Ok(tenant)
}
@@ -1728,7 +1704,7 @@ pub mod harness {
},
records.len()
);
println!("{s}");
println!("{}", s);
Ok(TEST_IMG(&s))
}
@@ -1752,7 +1728,7 @@ mod tests {
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
#[test]
fn test_basic() -> anyhow::Result<()> {
fn test_basic() -> Result<()> {
let tenant = TenantHarness::create("test_basic")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1776,7 +1752,7 @@ mod tests {
}
#[test]
fn no_duplicate_timelines() -> anyhow::Result<()> {
fn no_duplicate_timelines() -> Result<()> {
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
let _ = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1807,7 +1783,7 @@ mod tests {
/// Test branch creation
///
#[test]
fn test_branch() -> anyhow::Result<()> {
fn test_branch() -> Result<()> {
let tenant = TenantHarness::create("test_branch")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1835,7 +1811,7 @@ mod tests {
// Branch the history, modify relation differently on the new timeline
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
let new_writer = newtline.writer();
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
@@ -1860,7 +1836,7 @@ mod tests {
Ok(())
}
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
@@ -1902,7 +1878,7 @@ mod tests {
}
#[test]
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
@@ -1934,7 +1910,7 @@ mod tests {
}
#[test]
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
@@ -1961,7 +1937,7 @@ mod tests {
// FIXME: This currently fails to error out. Calling GC doesn't currently
// remove the old value, we'd need to work a little harder
#[test]
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
fn test_prohibit_get_for_garbage_collected_data() -> Result<()> {
let repo =
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
@@ -1981,7 +1957,7 @@ mod tests {
*/
#[test]
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant
@@ -1991,7 +1967,7 @@ mod tests {
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
@@ -2000,7 +1976,7 @@ mod tests {
Ok(())
}
#[test]
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant
@@ -2010,7 +1986,7 @@ mod tests {
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
@@ -2028,7 +2004,7 @@ mod tests {
}
#[test]
fn timeline_load() -> anyhow::Result<()> {
fn timeline_load() -> Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
{
@@ -2042,14 +2018,14 @@ mod tests {
let tenant = harness.load();
tenant
.get_timeline(TIMELINE_ID, true)
.get_timeline(TIMELINE_ID)
.expect("cannot load timeline");
Ok(())
}
#[test]
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
fn timeline_load_with_ancestor() -> Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
// create two timelines
@@ -2065,7 +2041,7 @@ mod tests {
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
@@ -2077,18 +2053,18 @@ mod tests {
// check that both, child and ancestor are loaded
let _child_tline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.get_timeline(NEW_TIMELINE_ID)
.expect("cannot get child timeline loaded");
let _ancestor_tline = tenant
.get_timeline(TIMELINE_ID, true)
.get_timeline(TIMELINE_ID)
.expect("cannot get ancestor timeline loaded");
Ok(())
}
#[test]
fn corrupt_metadata() -> anyhow::Result<()> {
fn corrupt_metadata() -> Result<()> {
const TEST_NAME: &str = "corrupt_metadata";
let harness = TenantHarness::create(TEST_NAME)?;
let tenant = harness.load();
@@ -2130,7 +2106,7 @@ mod tests {
}
#[test]
fn test_images() -> anyhow::Result<()> {
fn test_images() -> Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2182,7 +2158,7 @@ mod tests {
// repeat 50 times.
//
#[test]
fn test_bulk_insert() -> anyhow::Result<()> {
fn test_bulk_insert() -> Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2224,7 +2200,7 @@ mod tests {
}
#[test]
fn test_random_updates() -> anyhow::Result<()> {
fn test_random_updates() -> Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2296,7 +2272,7 @@ mod tests {
}
#[test]
fn test_traverse_branches() -> anyhow::Result<()> {
fn test_traverse_branches() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2335,7 +2311,7 @@ mod tests {
let new_tline_id = TimelineId::generate();
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = tenant
.get_timeline(new_tline_id, true)
.get_timeline(new_tline_id)
.expect("Should have the branched timeline");
tline_id = new_tline_id;
@@ -2377,7 +2353,7 @@ mod tests {
}
#[test]
fn test_traverse_ancestors() -> anyhow::Result<()> {
fn test_traverse_ancestors() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2398,7 +2374,7 @@ mod tests {
let new_tline_id = TimelineId::generate();
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = tenant
.get_timeline(new_tline_id, true)
.get_timeline(new_tline_id)
.expect("Should have the branched timeline");
tline_id = new_tline_id;

View File

@@ -1,12 +1,10 @@
//!
use anyhow::{anyhow, bail, ensure, Context};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::TimelineState;
use tokio::sync::watch;
use tokio::task::spawn_blocking;
use tracing::*;
@@ -34,12 +32,10 @@ use crate::tenant::{
use crate::config::{PageServerConf, METADATA_FILE_NAME};
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::page_image_cache;
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
use pageserver_api::reltag::RelTag;
use postgres_ffi::to_pg_timestamp;
use utils::{
@@ -56,7 +52,6 @@ use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_etcd_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::ZERO_PAGE;
use crate::{
page_cache,
storage_sync::{self, index::LayerFileMetadata},
@@ -163,8 +158,6 @@ pub struct Timeline {
/// Relation size cache
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
state: watch::Sender<TimelineState>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -312,6 +305,10 @@ pub struct GcInfo {
/// Public interface functions
impl Timeline {
//------------------------------------------------------------------------------
// Public GET functions
//------------------------------------------------------------------------------
/// Get the LSN where this branch was created
pub fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
@@ -421,11 +418,9 @@ impl Timeline {
/// those functions with an LSN that has been processed yet is an error.
///
pub async fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
// This should never be called from the WAL receiver, because that could lead
// to a deadlock.
anyhow::ensure!(
ensure!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnection),
"wait_lsn cannot be called in WAL receiver"
);
@@ -448,7 +443,7 @@ impl Timeline {
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
) -> anyhow::Result<()> {
) -> Result<()> {
ensure!(
lsn >= **latest_gc_cutoff_lsn,
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
@@ -458,6 +453,12 @@ impl Timeline {
Ok(())
}
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
@@ -476,91 +477,6 @@ impl Timeline {
}
}
pub fn compact(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
}
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
// chunk boundaries with the existing image layers to avoid
// too much churn. Also try to align chunk boundaries with
// relation boundaries. In principle, we don't know about
// relation boundaries here, we just deal with key-value
// pairs, and the code in pgdatadir_mapping.rs knows how to
// map relations into key-value pairs. But in practice we know
// that 'field6' is the block number, and the fields 1-5
// identify a relation. This is just an optimization,
// though.
//
// 2. Once we know the partitioning, for each partition,
// decide if it's time to create a new image layer. The
// criteria is: there has been too much "churn" since the last
// image layer? The "churn" is fuzzy concept, it's a
// combination of too many delta files, or too much WAL in
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
match self.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
) {
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
layer_paths_to_upload,
None,
);
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
Ok(())
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
@@ -568,109 +484,6 @@ impl Timeline {
_write_guard: self.write_lock.lock().unwrap(),
}
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(init_lsn);
}
Ok(size)
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
}
pub fn set_state(&self, new_state: TimelineState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
}
(TimelineState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
}
(TimelineState::Paused, TimelineState::Active) => {
debug!("Not activating a paused timeline");
}
(_, new_state) => {
self.state.send_replace(new_state);
}
}
}
pub fn current_state(&self) -> TimelineState {
*self.state.borrow()
}
pub fn is_active(&self) -> bool {
self.current_state() == TimelineState::Active
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
self.state.subscribe()
}
}
// Private functions
@@ -714,7 +527,7 @@ impl Timeline {
///
/// Loads the metadata for the timeline into memory, but not the layer map.
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
pub fn new(
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
metadata: TimelineMetadata,
@@ -724,9 +537,8 @@ impl Timeline {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
pg_version: u32,
) -> Self {
) -> Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Suspended);
let mut result = Timeline {
conf,
@@ -783,17 +595,16 @@ impl Timeline {
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
state,
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
pub fn launch_wal_receiver(self: &Arc<Self>) -> anyhow::Result<()> {
if !is_etcd_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because etcd client hasn't been initialized");
return;
return Ok(());
} else {
panic!("etcd client not initialized");
}
@@ -821,14 +632,16 @@ impl Timeline {
walreceiver_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
);
)?;
Ok(())
}
///
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
pub fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
let mut num_layers = 0;
@@ -914,13 +727,33 @@ impl Timeline {
Ok(())
}
pub(super) fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
self.layer_removal_cs
.try_lock()
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(init_lsn);
}
Ok(size)
}
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
let timeline_id = self.timeline_id;
// Atomically check if the timeline size calculation had already started.
// If the flag was not already set, this sets it.
if !self
@@ -937,42 +770,17 @@ impl Timeline {
"initial size calculation",
false,
async move {
let mut timeline_state_updates = self_clone.subscribe_for_state_updates();
let self_calculation = Arc::clone(&self_clone);
tokio::select! {
calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => {
let calculated_size = calculation_result
.context("Failed to spawn calculation result task")?
.context("Failed to calculate logical size")?;
match self_clone.current_logical_size.initial_logical_size.set(calculated_size) {
Ok(()) => info!("Successfully calculated initial logical size"),
Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"),
}
Ok(())
},
new_event = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return Some(new_state),
}
}
Err(_sender_dropped_error) => return None,
}
}
} => {
match new_event {
Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"),
None => info!("Timeline dropped state updates sender, stopping init size calculation"),
}
Ok(())
},
let calculated_size = self_clone.calculate_logical_size(init_lsn)?;
let result = spawn_blocking(move || {
self_clone.current_logical_size.initial_logical_size.set(calculated_size)
}).await?;
match result {
Ok(()) => info!("Successfully calculated initial logical size"),
Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"),
}
}.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)),
Ok(())
}
.instrument(info_span!("initial_logical_size_calculation", timeline = %timeline_id))
);
}
}
@@ -1163,7 +971,7 @@ impl Timeline {
Some((lsn, img))
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
fn get_ancestor_timeline(&self) -> Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
@@ -1222,14 +1030,14 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
layer.put_value(key, lsn, val)?;
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
@@ -1268,6 +1076,64 @@ impl Timeline {
drop(layers);
}
///
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
///
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
}
/// Flush all frozen layers to disk.
///
/// Only one task at a time can be doing layer-flushing for a
@@ -1275,7 +1141,7 @@ impl Timeline {
/// currently doing the flushing, this function will wait for it
/// to finish. If 'wait' is false, this function will return
/// immediately instead.
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
fn flush_frozen_layers(&self, wait: bool) -> Result<()> {
let flush_lock_guard = if wait {
self.layer_flush_lock.lock().unwrap()
} else {
@@ -1314,7 +1180,7 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -1372,7 +1238,7 @@ impl Timeline {
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
) -> anyhow::Result<()> {
) -> Result<()> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
@@ -1417,7 +1283,7 @@ impl Timeline {
false,
)?;
if self.can_upload_layers() {
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
@@ -1433,7 +1299,7 @@ impl Timeline {
fn create_delta_layer(
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(PathBuf, LayerFileMetadata)> {
) -> Result<(PathBuf, LayerFileMetadata)> {
// Write it out
let new_delta = frozen_layer.write_to_disk()?;
let new_delta_path = new_delta.path();
@@ -1468,7 +1334,92 @@ impl Timeline {
Ok((new_delta_path, LayerFileMetadata::new(sz)))
}
fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> {
pub fn compact(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
}
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
// chunk boundaries with the existing image layers to avoid
// too much churn. Also try to align chunk boundaries with
// relation boundaries. In principle, we don't know about
// relation boundaries here, we just deal with key-value
// pairs, and the code in pgdatadir_mapping.rs knows how to
// map relations into key-value pairs. But in practice we know
// that 'field6' is the block number, and the fields 1-5
// identify a relation. This is just an optimization,
// though.
//
// 2. Once we know the partitioning, for each partition,
// decide if it's time to create a new image layer. The
// criteria is: there has been too much "churn" since the last
// image layer? The "churn" is fuzzy concept, it's a
// combination of too many delta files, or too much WAL in
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
match self.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
) {
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
layer_paths_to_upload,
None,
);
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
Ok(())
}
fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
let mut partitioning_guard = self.partitioning.lock().unwrap();
if partitioning_guard.1 == Lsn(0)
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
@@ -1482,7 +1433,7 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<bool> {
let layers = self.layers.read().unwrap();
for part_range in &partition.ranges {
@@ -1527,7 +1478,7 @@ impl Timeline {
partitioning: &KeyPartitioning,
lsn: Lsn,
force: bool,
) -> anyhow::Result<HashMap<PathBuf, LayerFileMetadata>> {
) -> Result<HashMap<PathBuf, LayerFileMetadata>> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers: Vec<ImageLayer> = Vec::new();
for partition in partitioning.parts.iter() {
@@ -1545,32 +1496,7 @@ impl Timeline {
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
let img = match self.get(key, lsn) {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(err);
}
}
};
let img = self.get(key, lsn)?;
image_layer_writer.put_image(key, &img)?;
key = key.next();
}
@@ -1620,7 +1546,7 @@ impl Timeline {
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
///
fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
let layers = self.layers.read().unwrap();
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
@@ -1887,7 +1813,7 @@ impl Timeline {
}
drop(layers);
if self.can_upload_layers() {
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
@@ -1930,12 +1856,12 @@ impl Timeline {
///
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
/// whether a record is needed for PITR.
pub(super) fn update_gc_info(
pub fn update_gc_info(
&self,
retain_lsns: Vec<Lsn>,
cutoff_horizon: Lsn,
pitr: Duration,
) -> anyhow::Result<()> {
) -> Result<()> {
let mut gc_info = self.gc_info.write().unwrap();
gc_info.horizon_cutoff = cutoff_horizon;
@@ -1990,8 +1916,8 @@ impl Timeline {
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
pub(super) fn gc(&self) -> anyhow::Result<GcResult> {
let mut result: GcResult = GcResult::default();
pub fn gc(&self) -> Result<GcResult> {
let mut result: GcResult = Default::default();
let now = SystemTime::now();
fail_point!("before-timeline-gc");
@@ -2033,10 +1959,10 @@ impl Timeline {
new_gc_cutoff
);
write_guard.store_and_unlock(new_gc_cutoff).wait();
// Persist metadata file
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
}
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
info!("GC starting");
@@ -2163,15 +2089,18 @@ impl Timeline {
}
info!(
"GC completed removing {} layers, cutoff {}",
"GC completed removing {} layers, cuttof {}",
result.layers_removed, new_gc_cutoff
);
if result.layers_removed != 0 {
fail_point!("after-timeline-gc-removed-layers");
fail_point!("gc-before-save-metadata", |_| {
info!("Abnormaly terinate pageserver at gc-before-save-metadata fail point");
std::process::abort();
});
return Ok(result);
}
if self.can_upload_layers() {
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_delete(
self.tenant_id,
self.timeline_id,
@@ -2260,11 +2189,6 @@ impl Timeline {
}
}
}
fn can_upload_layers(&self) -> bool {
self.upload_layers.load(atomic::Ordering::Relaxed)
&& self.current_state() != TimelineState::Broken
}
}
/// Helper function for get_reconstruct_data() to add the path of layers traversed
@@ -2315,12 +2239,11 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
page_image_cache::remove(key, self.tenant_id, self.timeline_id);
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
self.tl.put_value(key, lsn, value)
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
self.tl.put_tombstone(key_range, lsn)
}

View File

@@ -175,7 +175,7 @@ async fn wait_for_active_tenant(
}
state => {
debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}");
continue;
tokio::time::sleep(wait).await;
}
}
}

View File

@@ -31,10 +31,9 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walrecord::*;
use crate::ZERO_PAGE;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
@@ -44,6 +43,8 @@ use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
pub struct WalIngest<'a> {
timeline: &'a Timeline,

View File

@@ -12,7 +12,6 @@
use std::{
collections::{hash_map, HashMap},
num::NonZeroU64,
ops::ControlFlow,
sync::Arc,
time::Duration,
};
@@ -27,8 +26,7 @@ use etcd_broker::{
subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription,
BrokerUpdate, Client,
};
use pageserver_api::models::TimelineState;
use tokio::{select, sync::watch};
use tokio::select;
use tracing::*;
use crate::{
@@ -49,7 +47,7 @@ pub fn spawn_connection_manager_task(
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
) {
) -> anyhow::Result<()> {
let mut etcd_client = get_etcd_client().clone();
let tenant_id = timeline.tenant_id;
@@ -60,7 +58,10 @@ pub fn spawn_connection_manager_task(
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
&format!(
"walreceiver for tenant {} timeline {}",
timeline.tenant_id, timeline.timeline_id
),
false,
async move {
info!("WAL receiver broker started, connecting to etcd");
@@ -74,21 +75,19 @@ pub fn spawn_connection_manager_task(
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
walreceiver_state.shutdown().await;
// Kill current connection, if any
if let Some(wal_connection) = walreceiver_state.wal_connection.take()
{
wal_connection.connection_task.shutdown().await;
}
return Ok(());
},
loop_step_result = connection_manager_loop_step(
_ = connection_manager_loop_step(
&broker_loop_prefix,
&mut etcd_client,
&mut walreceiver_state,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
}
},
) => {},
}
}
}
@@ -96,6 +95,7 @@ pub fn spawn_connection_manager_task(
info_span!("wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id),
),
);
Ok(())
}
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
@@ -105,17 +105,7 @@ async fn connection_manager_loop_step(
broker_prefix: &str,
etcd_client: &mut Client,
walreceiver_state: &mut WalreceiverState,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates();
match wait_for_active_timeline(&mut timeline_state_updates).await {
ControlFlow::Continue(()) => {}
ControlFlow::Break(()) => {
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
return ControlFlow::Break(());
}
}
) {
let id = TenantTimelineId {
tenant_id: walreceiver_state.timeline.tenant_id,
timeline_id: walreceiver_state.timeline.timeline_id,
@@ -140,12 +130,10 @@ async fn connection_manager_loop_step(
// - change connection if the rules decide so, or if the current connection dies
// - receive updates from broker
// - this might change the current desired connection
// - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
broker_connection_result = &mut broker_subscription.watcher_handle => {
info!("Broker connection was closed from the other side, ending current broker loop step");
cleanup_broker_connection(broker_connection_result, walreceiver_state);
return ControlFlow::Continue(());
return;
},
Some(wal_connection_update) = async {
@@ -198,36 +186,11 @@ async fn connection_manager_loop_step(
(&mut broker_subscription.watcher_handle).await,
walreceiver_state,
);
return ControlFlow::Continue(());
return;
}
}
},
new_event = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = walreceiver_state.timeline.current_state();
match new_state {
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return ControlFlow::Continue(new_state),
}
}
Err(_sender_dropped_error) => return ControlFlow::Break(()),
}
}
} => match new_event {
ControlFlow::Continue(new_state) => {
info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates");
return ControlFlow::Continue(());
}
ControlFlow::Break(()) => {
info!("Timeline dropped state updates sender, stopping wal connection manager loop");
return ControlFlow::Break(());
}
},
_ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {}
}
@@ -254,34 +217,6 @@ async fn connection_manager_loop_step(
}
}
async fn wait_for_active_timeline(
timeline_state_updates: &mut watch::Receiver<TimelineState>,
) -> ControlFlow<(), ()> {
let current_state = *timeline_state_updates.borrow();
if current_state == TimelineState::Active {
return ControlFlow::Continue(());
}
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
TimelineState::Active => {
debug!("Timeline state changed to active, continuing the walreceiver connection manager");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => return ControlFlow::Break(()),
}
}
}
fn cleanup_broker_connection(
broker_connection_result: Result<Result<(), etcd_broker::BrokerError>, tokio::task::JoinError>,
walreceiver_state: &mut WalreceiverState,
@@ -789,12 +724,6 @@ impl WalreceiverState {
self.wal_connection_retries.remove(&node_id);
}
}
async fn shutdown(mut self) {
if let Some(wal_connection) = self.wal_connection.take() {
wal_connection.connection_task.shutdown().await;
}
}
}
#[derive(Debug, PartialEq, Eq)]
@@ -873,7 +802,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -890,8 +818,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -908,7 +834,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -925,7 +850,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -985,8 +909,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1003,8 +925,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -1021,8 +941,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -1057,8 +975,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1091,8 +1007,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
},
etcd_version: 0,
@@ -1109,8 +1023,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1127,8 +1039,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -1174,8 +1084,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1192,8 +1100,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1263,8 +1169,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1281,8 +1185,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
},
etcd_version: 0,
@@ -1354,8 +1256,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1427,8 +1327,6 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,

View File

@@ -43,10 +43,10 @@ use crate::metrics::{
WAL_REDO_WAIT_TIME,
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Key;
use crate::walrecord::NeonWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::v14::nonrelfile_utils::{
@@ -610,26 +610,13 @@ impl PostgresRedoProcess {
);
fs::remove_dir_all(&datadir)?;
}
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| {
Error::new(
ErrorKind::Other,
format!("incorrect pg_bin_dir path: {}", e),
)
})?;
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| {
Error::new(
ErrorKind::Other,
format!("incorrect pg_lib_dir path: {}", e),
)
})?;
info!("running initdb in {}", datadir.display());
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
let initdb = Command::new(conf.pg_bin_dir(pg_version).join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) // macOS
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.close_fds()
.output()
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?;
@@ -655,14 +642,14 @@ impl PostgresRedoProcess {
}
// Start postgres itself
let mut child = Command::new(pg_bin_dir_path.join("postgres"))
let mut child = Command::new(conf.pg_bin_dir(pg_version).join("postgres"))
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("PGDATA", &datadir)
// The redo process is not trusted, so it runs in seccomp mode
// (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't

64
poetry.lock generated
View File

@@ -11,7 +11,7 @@ async-timeout = ">=3.0,<5.0"
psycopg2-binary = ">=2.8.4"
[package.extras]
sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
sa = ["sqlalchemy[postgresql_psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "allure-pytest"
@@ -80,7 +80,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
dev = ["cloudpickle", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "sphinx", "sphinx-notfound-page", "zope.interface"]
docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"]
tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "zope.interface"]
tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"]
tests_no_zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"]
[[package]]
name = "aws-sam-translator"
@@ -514,6 +514,14 @@ python-versions = ">=3.7"
[package.dependencies]
typing-extensions = ">=4.1.0"
[[package]]
name = "cached-property"
version = "1.5.2"
description = "A decorator for caching properties in classes."
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "certifi"
version = "2022.6.15"
@@ -560,7 +568,7 @@ optional = false
python-versions = ">=3.6.0"
[package.extras]
unicode-backport = ["unicodedata2"]
unicode_backport = ["unicodedata2"]
[[package]]
name = "click"
@@ -593,7 +601,7 @@ python-versions = ">=3.6"
cffi = ">=1.12"
[package.extras]
docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx_rtd_theme"]
docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx-rtd-theme"]
docstest = ["pyenchant (>=1.6.11)", "sphinxcontrib-spelling (>=4.0.1)", "twine (>=1.12.0)"]
pep8test = ["black", "flake8", "flake8-import-order", "pep8-naming"]
sdist = ["setuptools_rust (>=0.11.4)"]
@@ -738,9 +746,9 @@ python-versions = ">=3.6.1,<4.0"
[package.extras]
colors = ["colorama (>=0.4.3,<0.5.0)"]
pipfile-deprecated-finder = ["pipreqs", "requirementslib"]
pipfile_deprecated_finder = ["pipreqs", "requirementslib"]
plugins = ["setuptools"]
requirements-deprecated-finder = ["pip-api", "pipreqs"]
requirements_deprecated_finder = ["pip-api", "pipreqs"]
[[package]]
name = "itsdangerous"
@@ -815,7 +823,7 @@ python-versions = ">=2.7"
[package.extras]
docs = ["jaraco.packaging (>=3.2)", "rst.linker (>=1.9)", "sphinx"]
testing = ["ecdsa", "enum34", "feedparser", "jsonlib", "numpy", "pandas", "pymongo", "pytest (>=3.5,!=3.7.3)", "pytest-black-multipy", "pytest-checkdocs (>=1.2.3)", "pytest-cov", "pytest-flake8 (<1.1.0)", "pytest-flake8 (>=1.1.1)", "scikit-learn", "sqlalchemy"]
testing-libs = ["simplejson", "ujson", "yajl"]
"testing.libs" = ["simplejson", "ujson", "yajl"]
[[package]]
name = "jsonpointer"
@@ -836,12 +844,11 @@ python-versions = "*"
[package.dependencies]
attrs = ">=17.4.0"
pyrsistent = ">=0.14.0"
setuptools = "*"
six = ">=1.11.0"
[package.extras]
format = ["idna", "jsonpointer (>1.13)", "rfc3987", "strict-rfc3339", "webcolors"]
format-nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "webcolors"]
format_nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "webcolors"]
[[package]]
name = "junit-xml"
@@ -901,7 +908,6 @@ pytz = "*"
PyYAML = {version = ">=5.1", optional = true, markers = "extra == \"server\""}
requests = ">=2.5"
responses = ">=0.9.0"
setuptools = {version = "*", optional = true, markers = "extra == \"server\""}
sshpubkeys = {version = ">=3.1.0", optional = true, markers = "extra == \"server\""}
werkzeug = ">=0.5,<2.2.0"
xmltodict = "*"
@@ -1010,7 +1016,6 @@ python-versions = ">=3.7.0,<4.0.0"
jsonschema = ">=3.2.0,<5.0.0"
openapi-schema-validator = ">=0.2.0,<0.3.0"
PyYAML = ">=5.1"
setuptools = "*"
[package.extras]
requests = ["requests"]
@@ -1343,7 +1348,7 @@ urllib3 = ">=1.21.1,<1.27"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
use_chardet_on_py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "responses"
@@ -1397,19 +1402,6 @@ python-versions = ">= 2.7"
attrs = "*"
pbr = "*"
[[package]]
name = "setuptools"
version = "65.5.0"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mock", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
[[package]]
name = "six"
version = "1.16.0"
@@ -1476,14 +1468,6 @@ category = "main"
optional = false
python-versions = ">=3.7,<4.0"
[[package]]
name = "types-toml"
version = "0.10.8"
description = "Typing stubs for toml"
category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "types-urllib3"
version = "1.26.17"
@@ -1568,7 +1552,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "17cdbfe90f1b06dffaf24c3e076384ec08dd4a2dce5a05e50565f7364932eb2d"
content-hash = "ead1495454ee6d880bb240447025db93a25ebe263c2709de5f144cc2d85dc975"
[metadata.files]
aiopg = [
@@ -1663,6 +1647,10 @@ botocore-stubs = [
{file = "botocore-stubs-1.27.38.tar.gz", hash = "sha256:408e8b86b5d171b58f81c74ca9d3b5317a5a8e2d3bc2073aa841ac13b8939e56"},
{file = "botocore_stubs-1.27.38-py3-none-any.whl", hash = "sha256:7add7641e9a479a9c8366893bb522fd9ca3d58714201e43662a200a148a1bc38"},
]
cached-property = [
{file = "cached-property-1.5.2.tar.gz", hash = "sha256:9fa5755838eecbb2d234c3aa390bd80fbd3ac6b6869109bfc1b499f7bd89a130"},
{file = "cached_property-1.5.2-py2.py3-none-any.whl", hash = "sha256:df4f613cf7ad9a588cc381aaf4a512d26265ecebd5eb9e1ba12f1319eb85a6a0"},
]
certifi = [
{file = "certifi-2022.6.15-py3-none-any.whl", hash = "sha256:fe86415d55e84719d75f8b69414f6438ac3547d2078ab91b67e779ef69378412"},
{file = "certifi-2022.6.15.tar.gz", hash = "sha256:84c85a9078b11105f04f3036a9482ae10e4621616db313fe045dd24743a0820d"},
@@ -2206,10 +2194,6 @@ sarif-om = [
{file = "sarif_om-1.0.4-py3-none-any.whl", hash = "sha256:539ef47a662329b1c8502388ad92457425e95dc0aaaf995fe46f4984c4771911"},
{file = "sarif_om-1.0.4.tar.gz", hash = "sha256:cd5f416b3083e00d402a92e449a7ff67af46f11241073eea0461802a3b5aef98"},
]
setuptools = [
{file = "setuptools-65.5.0-py3-none-any.whl", hash = "sha256:f62ea9da9ed6289bfe868cd6845968a2c854d1427f8548d52cae02a42b4f0356"},
{file = "setuptools-65.5.0.tar.gz", hash = "sha256:512e5536220e38146176efb833d4a62aa726b7bbff82cfbc8ba9eaa3996e0b17"},
]
six = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
@@ -2238,10 +2222,6 @@ types-s3transfer = [
{file = "types-s3transfer-0.6.0.post3.tar.gz", hash = "sha256:92c3704e5d041202bfb5ddb79d083fd1a02de2c5dfec6a91576823e6b5c93993"},
{file = "types_s3transfer-0.6.0.post3-py3-none-any.whl", hash = "sha256:eedc5117275565b3c83662c0ccc81662a34da5dda8bd502b89d296b6d5cb091d"},
]
types-toml = [
{file = "types-toml-0.10.8.tar.gz", hash = "sha256:b7e7ea572308b1030dc86c3ba825c5210814c2825612ec679eb7814f8dd9295a"},
{file = "types_toml-0.10.8-py3-none-any.whl", hash = "sha256:8300fd093e5829eb9c1fba69cee38130347d4b74ddf32d0a7df650ae55c2b599"},
]
types-urllib3 = [
{file = "types-urllib3-1.26.17.tar.gz", hash = "sha256:73fd274524c3fc7cd8cd9ceb0cb67ed99b45f9cb2831013e46d50c1451044800"},
{file = "types_urllib3-1.26.17-py3-none-any.whl", hash = "sha256:0d027fcd27dbb3cb532453b4d977e05bc1e13aefd70519866af211b3003d895d"},

View File

@@ -14,6 +14,7 @@ requests = "^2.26.0"
pytest-xdist = "^2.3.0"
asyncpg = "^0.24.0"
aiopg = "^1.3.1"
cached-property = "^1.5.2"
Jinja2 = "^3.0.2"
types-requests = "^2.28.5"
types-psycopg2 = "^2.9.18"
@@ -28,14 +29,12 @@ Werkzeug = "2.1.2"
pytest-order = "^1.0.1"
allure-pytest = "^2.10.0"
pytest-asyncio = "^0.19.0"
toml = "^0.10.2"
[tool.poetry.dev-dependencies]
flake8 = "^5.0.4"
mypy = "==0.971"
black = "^22.6.0"
isort = "^5.10.1"
types-toml = "^0.10.8"
[build-system]
requires = ["poetry-core>=1.0.0"]
@@ -75,6 +74,7 @@ strict = true
[[tool.mypy.overrides]]
module = [
"asyncpg.*",
"cached_property.*",
"pg8000.*",
]
ignore_missing_imports = true

View File

@@ -21,8 +21,7 @@ use metrics::set_build_info_metric;
use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use safekeeper::http;
use safekeeper::remove_wal;
@@ -32,12 +31,8 @@ use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use utils::auth::JwtAuth;
use utils::{
http::endpoint,
id::NodeId,
logging::{self, LogFormat},
project_git_version,
shutdown::exit_now,
signals, tcp_listener,
http::endpoint, id::NodeId, logging, project_git_version, shutdown::exit_now, signals,
tcp_listener,
};
const LOCK_FILE_NAME: &str = "safekeeper.lock";
@@ -77,6 +72,10 @@ fn main() -> anyhow::Result<()> {
conf.listen_http_addr = addr.to_string();
}
if let Some(recall) = arg_matches.get_one::<String>("recall") {
conf.recall_period = humantime::parse_duration(recall)?;
}
let mut given_id = None;
if let Some(given_id_str) = arg_matches.get_one::<String>("id") {
given_id = Some(NodeId(
@@ -94,16 +93,6 @@ fn main() -> anyhow::Result<()> {
conf.broker_etcd_prefix = prefix.to_string();
}
if let Some(heartbeat_timeout_str) = arg_matches.get_one::<String>("heartbeat-timeout") {
conf.heartbeat_timeout =
humantime::parse_duration(heartbeat_timeout_str).with_context(|| {
format!(
"failed to parse heartbeat-timeout {}",
heartbeat_timeout_str
)
})?;
}
if let Some(backup_threads) = arg_matches.get_one::<String>("wal-backup-threads") {
conf.backup_runtime_threads = backup_threads
.parse()
@@ -116,14 +105,6 @@ fn main() -> anyhow::Result<()> {
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?);
}
if let Some(max_offloader_lag_str) = arg_matches.get_one::<String>("max-offloader-lag") {
conf.max_offloader_lag_bytes = max_offloader_lag_str.parse().with_context(|| {
format!(
"failed to parse max offloader lag {}",
max_offloader_lag_str
)
})?;
}
// Seems like there is no better way to accept bool values explicitly in clap.
conf.wal_backup_enabled = arg_matches
.get_one::<String>("enable-wal-backup")
@@ -135,15 +116,11 @@ fn main() -> anyhow::Result<()> {
.get_one::<String>("auth-validation-public-key-path")
.map(PathBuf::from);
if let Some(log_format) = arg_matches.get_one::<String>("log-format") {
conf.log_format = LogFormat::from_config(log_format)?;
}
start_safekeeper(conf, given_id, arg_matches.get_flag("init"))
}
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bool) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize, conf.log_format)?;
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
info!("version: {GIT_VERSION}");
@@ -384,6 +361,11 @@ fn cli() -> Command {
.short('p')
.long("pageserver"),
)
.arg(
Arg::new("recall")
.long("recall")
.help("Period for requestion pageserver to call for replication"),
)
.arg(
Arg::new("daemonize")
.short('d')
@@ -415,11 +397,6 @@ fn cli() -> Command {
.long("broker-etcd-prefix")
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.arg(
Arg::new("heartbeat-timeout")
.long("heartbeat-timeout")
.help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs()))
)
.arg(
Arg::new("wal-backup-threads").long("backup-threads").help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")),
).arg(
@@ -427,11 +404,6 @@ fn cli() -> Command {
.long("remote-storage")
.help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"<BUCKETNAME>\", \"bucket_region\":\"<REGION>\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring structure on the file system.")
)
.arg(
Arg::new("max-offloader-lag")
.long("max-offloader-lag")
.help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG_BYTES / (1 << 20)))
)
.arg(
Arg::new("enable-wal-backup")
.long("enable-wal-backup")
@@ -444,11 +416,6 @@ fn cli() -> Command {
.long("auth-validation-public-key-path")
.help("Path to an RSA .pem public key which is used to check JWT tokens")
)
.arg(
Arg::new("log-format")
.long("log-format")
.help("Format for logging, either 'plain' or 'json'")
)
}
#[test]

View File

@@ -1,5 +1,6 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
@@ -11,9 +12,11 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use url::Url;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -53,6 +56,113 @@ fn timeline_safekeeper_path(
)
}
pub struct Election {
pub election_name: String,
pub candidate_name: String,
pub broker_endpoints: Vec<Url>,
}
impl Election {
pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec<Url>) -> Self {
Self {
election_name,
candidate_name,
broker_endpoints,
}
}
}
pub struct ElectionLeader {
client: Client,
keep_alive: JoinHandle<Result<()>>,
}
impl ElectionLeader {
pub async fn check_am_i(
&mut self,
election_name: String,
candidate_name: String,
) -> Result<bool> {
let resp = self.client.leader(election_name).await?;
let kv = resp
.kv()
.ok_or_else(|| anyhow!("failed to get leader response"))?;
let leader = kv.value_str()?;
Ok(leader == candidate_name)
}
pub async fn give_up(self) {
self.keep_alive.abort();
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
// should we await for keep alive termination?
let _ = self.keep_alive.await;
}
}
pub async fn get_leader(req: &Election, leader: &mut Option<ElectionLeader>) -> Result<()> {
let mut client = Client::connect(req.broker_endpoints.clone(), None)
.await
.context("Could not connect to etcd")?;
let lease = client
.lease_grant(LEASE_TTL_SEC, None)
.await
.context("Could not acquire a lease");
let lease_id = lease.map(|l| l.id()).unwrap();
// kill previous keepalive, if any
if let Some(l) = leader.take() {
l.give_up().await;
}
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
// immediately save handle to kill task if we get canceled below
*leader = Some(ElectionLeader {
client: client.clone(),
keep_alive,
});
client
.campaign(
req.election_name.clone(),
req.candidate_name.clone(),
lease_id,
)
.await?;
Ok(())
}
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
let (mut keeper, mut ka_stream) = client
.lease_keep_alive(lease_id)
.await
.context("failed to create keepalive stream")?;
loop {
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{system_id}")
}
async fn push_sk_info(
ttid: TenantTimelineId,
mut client: Client,
@@ -126,7 +236,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let handles = active_tlis
.iter()
.map(|tli| {
let sk_info = tli.get_safekeeper_info(&conf);
let sk_info = tli.get_public_info(&conf);
let key =
timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id);
let lease = leases.remove(&tli.ttid).unwrap();
@@ -172,9 +282,6 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
Some(new_info) => {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(new_info.key.id) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}

View File

@@ -1,7 +1,6 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
TermSwitchEntry,
AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry,
};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
@@ -135,7 +134,7 @@ pub struct SafeKeeperStateV4 {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: PersistedPeers,
pub peers: Peers,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
@@ -166,7 +165,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
peers: Peers(vec![]),
});
// migrate to hexing some ids
} else if version == 2 {
@@ -189,7 +188,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
peers: Peers(vec![]),
});
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
} else if version == 3 {
@@ -212,7 +211,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
peers: Peers(vec![]),
});
// migrate to having timeline_start_lsn
} else if version == 4 {
@@ -235,7 +234,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
peers: Peers(vec![]),
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);

View File

@@ -1,16 +1,11 @@
use defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS;
//
use remote_storage::RemoteStorageConfig;
use std::path::PathBuf;
use std::time::Duration;
use url::Url;
use utils::{
id::{NodeId, TenantId, TenantTimelineId},
logging::LogFormat,
};
use utils::id::{NodeId, TenantId, TenantTimelineId};
pub mod broker;
pub mod control_file;
@@ -39,9 +34,8 @@ pub mod defaults {
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
}
#[derive(Debug, Clone)]
@@ -58,6 +52,7 @@ pub struct SafeKeeperConf {
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub recall_period: Duration,
pub remote_storage: Option<RemoteStorageConfig>,
pub backup_runtime_threads: usize,
pub wal_backup_enabled: bool,
@@ -65,9 +60,6 @@ pub struct SafeKeeperConf {
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub auth_validation_public_key_path: Option<PathBuf>,
pub heartbeat_timeout: Duration,
pub max_offloader_lag_bytes: u64,
pub log_format: LogFormat,
}
impl SafeKeeperConf {
@@ -93,15 +85,13 @@ impl Default for SafeKeeperConf {
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
remote_storage: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: NodeId(0),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
auth_validation_public_key_path: None,
heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT,
max_offloader_lag_bytes: DEFAULT_MAX_OFFLOADER_LAG_BYTES,
log_format: LogFormat::Plain,
}
}
}

View File

@@ -11,7 +11,6 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use tracing::*;
use crate::control_file;
@@ -133,8 +132,9 @@ pub struct ServerInfo {
pub wal_seg_size: u32,
}
/// Data published by safekeeper to the peers
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedPeerInfo {
pub struct PeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
backup_lsn: Lsn,
/// Term of the last entry.
@@ -145,7 +145,7 @@ pub struct PersistedPeerInfo {
commit_lsn: Lsn,
}
impl PersistedPeerInfo {
impl PeerInfo {
fn new() -> Self {
Self {
backup_lsn: Lsn::INVALID,
@@ -156,8 +156,10 @@ impl PersistedPeerInfo {
}
}
// vector-based node id -> peer state map with very limited functionality we
// need/
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
pub struct Peers(pub Vec<(NodeId, PeerInfo)>);
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
@@ -201,7 +203,7 @@ pub struct SafeKeeperState {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: PersistedPeers,
pub peers: Peers,
}
#[derive(Debug, Clone)]
@@ -238,12 +240,7 @@ impl SafeKeeperState {
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(
peers
.iter()
.map(|p| (*p, PersistedPeerInfo::new()))
.collect(),
),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
}
}

View File

@@ -7,7 +7,7 @@ use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::XLogSegNo;
use tokio::{sync::watch, time::Instant};
use tokio::sync::watch;
use std::cmp::{max, min};
@@ -26,7 +26,7 @@ use utils::{
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term,
SafekeeperMemState, ServerInfo,
};
use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
@@ -36,53 +36,6 @@ use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub sk_id: NodeId,
/// Term of the last entry.
_last_log_term: Term,
/// LSN of the last record.
_flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
pub local_start_lsn: Lsn,
/// When info was received.
ts: Instant,
}
impl PeerInfo {
fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id,
_last_log_term: sk_info.last_log_term.unwrap_or(0),
_flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID),
commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID),
local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID),
ts,
}
}
}
// vector-based node id -> peer state map with very limited functionality we
// need.
#[derive(Debug, Clone, Default)]
pub struct PeersInfo(pub Vec<PeerInfo>);
impl PeersInfo {
fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
self.0.iter_mut().find(|p| p.sk_id == id)
}
fn upsert(&mut self, p: &PeerInfo) {
match self.get(p.sk_id) {
Some(rp) => *rp = p.clone(),
None => self.0.push(p.clone()),
}
}
}
/// Replica status update + hot standby feedback
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
@@ -121,8 +74,6 @@ impl ReplicaState {
pub struct SharedState {
/// Safekeeper object
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// In memory list containing state of peers sent in latest messages from them.
peers_info: PeersInfo,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// True when WAL backup launcher oversees the timeline, making sure WAL is
@@ -172,8 +123,7 @@ impl SharedState {
Ok(Self {
sk,
peers_info: PeersInfo(vec![]),
replicas: vec![],
replicas: Vec::new(),
wal_backup_active: false,
active: false,
num_computes: 0,
@@ -192,7 +142,6 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
replicas: Vec::new(),
wal_backup_active: false,
active: false,
@@ -252,6 +201,12 @@ impl SharedState {
self.wal_backup_active
}
// Can this safekeeper offload to s3? Recently joined safekeepers might not
// have necessary WAL.
fn can_wal_backup(&self) -> bool {
self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -313,24 +268,6 @@ impl SharedState {
self.replicas.push(Some(state));
pos
}
fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
SkTimelineInfo {
last_log_term: Some(self.sk.get_epoch()),
flush_lsn: Some(self.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(self.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
self.get_replicas_state().remote_consistent_lsn,
self.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(self.sk.inmem.backup_lsn),
local_start_lsn: Some(self.sk.state.local_start_lsn),
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -580,6 +517,17 @@ impl Timeline {
self.write_shared_state().wal_backup_attend()
}
/// Can this safekeeper offload to s3? Recently joined safekeepers might not
/// have necessary WAL.
pub fn can_wal_backup(&self) -> bool {
if self.is_cancelled() {
return false;
}
let shared_state = self.write_shared_state();
shared_state.can_wal_backup()
}
/// Returns full timeline info, required for the metrics. If the timeline is
/// not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
@@ -684,25 +632,36 @@ impl Timeline {
Ok(())
}
/// Get safekeeper info for broadcasting to broker and other peers.
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
/// Return public safekeeper info for broadcasting to broker and other peers.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
let shared_state = self.write_shared_state();
shared_state.get_safekeeper_info(conf)
SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
shared_state.get_replicas_state().remote_consistent_lsn,
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
}
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(
&self,
sk_info: &SkTimelineInfo,
sk_id: NodeId,
_sk_id: NodeId,
) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state();
shared_state.sk.record_safekeeper_info(sk_info)?;
let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
@@ -714,22 +673,6 @@ impl Timeline {
Ok(())
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state();
let now = Instant::now();
shared_state
.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
.cloned()
.collect()
}
/// Add send_wal replica to the in-memory vector of replicas.
pub fn add_replica(&self, state: ReplicaState) -> usize {
self.write_shared_state().add_replica(state)

View File

@@ -1,7 +1,8 @@
use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::task::JoinHandle;
use utils::id::NodeId;
use std::cmp::min;
use std::collections::HashMap;
@@ -25,11 +26,14 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::timeline::{PeerInfo, Timeline};
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::broker::{Election, ElectionLeader};
use crate::timeline::Timeline;
use crate::{broker, GlobalTimelines, SafeKeeperConf};
use once_cell::sync::OnceCell;
const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
@@ -66,104 +70,47 @@ struct WalBackupTimelineEntry {
handle: Option<WalBackupTaskHandle>,
}
async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
if let Some(wb_handle) = entry.handle.take() {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
}
/// The goal is to ensure that normally only one safekeepers offloads. However,
/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
/// time we have several ones as they PUT the same files. Also,
/// - frequently changing the offloader would be bad;
/// - electing seriously lagging safekeeper is undesirable;
/// So we deterministically choose among the reasonably caught up candidates.
/// TODO: take into account failed attempts to deal with hypothetical situation
/// where s3 is unreachable only for some sks.
fn determine_offloader(
alive_peers: &[PeerInfo],
wal_backup_lsn: Lsn,
ttid: TenantTimelineId,
conf: &SafeKeeperConf,
) -> (Option<NodeId>, String) {
// TODO: remove this once we fill newly joined safekeepers since backup_lsn.
let capable_peers = alive_peers
.iter()
.filter(|p| p.local_start_lsn <= wal_backup_lsn);
match capable_peers.clone().map(|p| p.commit_lsn).max() {
None => (None, "no connected peers to elect from".to_string()),
Some(max_commit_lsn) => {
let threshold = max_commit_lsn
.checked_sub(conf.max_offloader_lag_bytes)
.unwrap_or(Lsn(0));
let mut caughtup_peers = capable_peers
.clone()
.filter(|p| p.commit_lsn >= threshold)
.collect::<Vec<_>>();
caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
// To distribute the load, shift by timeline_id.
let offloader = caughtup_peers
[(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
.sk_id;
let mut capable_peers_dbg = capable_peers
.map(|p| (p.sk_id, p.commit_lsn))
.collect::<Vec<_>>();
capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
(
Some(offloader),
format!(
"elected {} among {:?} peers, with {} of them being caughtup",
offloader,
capable_peers_dbg,
caughtup_peers.len()
),
)
}
}
}
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
async fn update_task(
/// Start per timeline task, if it makes sense for this safekeeper to offload.
fn consider_start_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
entry: &mut WalBackupTimelineEntry,
task: &mut WalBackupTimelineEntry,
) {
let alive_peers = entry.timeline.get_peers(conf);
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn();
let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
if elected_me != (entry.handle.is_some()) {
if elected_me {
info!("elected for backup {}: {}", ttid, election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
entry.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
info!("stepping down from backup {}: {}", ttid, election_dbg_str);
shut_down_task(ttid, entry).await;
}
if !task.timeline.can_wal_backup() {
return;
}
info!("starting WAL backup task for {}", ttid);
// TODO: decide who should offload right here by simply checking current
// state instead of running elections in offloading task.
let election_name = SubscriptionKey {
cluster_prefix: conf.broker_etcd_prefix.clone(),
kind: SubscriptionKind::Operation(
ttid,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::WalBackup),
),
}
.watch_key();
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
task.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
@@ -211,20 +158,27 @@ async fn wal_backup_launcher_main_loop(
timeline,
handle: None,
});
update_task(&conf, ttid, entry).await;
consider_start_task(&conf, ttid, entry);
} else {
// need to stop the task
info!("stopping WAL backup task for {}", ttid);
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
let entry = tasks.remove(&ttid).unwrap();
if let Some(wb_handle) = entry.handle {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
}
}
}
// For each timeline needing offloading, check if this safekeeper
// should do the job and start/stop the task accordingly.
// Start known tasks, if needed and possible.
_ = ticker.tick() => {
for (ttid, entry) in tasks.iter_mut() {
update_task(&conf, *ttid, entry).await;
for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) {
consider_start_task(&conf, *ttid, entry);
}
}
}
@@ -236,13 +190,17 @@ struct WalBackupTask {
timeline_dir: PathBuf,
wal_seg_size: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
leader: Option<ElectionLeader>,
election: Election,
}
/// Offload single timeline.
/// Offload single timeline. Called only after we checked that backup
/// is required (wal_backup_attend) and possible (can_wal_backup).
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: PathBuf,
mut shutdown_rx: Receiver<()>,
election: Election,
) {
info!("started");
let res = GlobalTimelines::get(ttid);
@@ -257,6 +215,8 @@ async fn backup_task_main(
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli,
timeline_dir,
leader: None,
election,
};
// task is spinned up only when wal_seg_size already initialized
@@ -269,6 +229,9 @@ async fn backup_task_main(
canceled = true;
}
}
if let Some(l) = wb.leader {
l.give_up().await;
}
info!("task {}", if canceled { "canceled" } else { "terminated" });
}
@@ -276,71 +239,106 @@ impl WalBackupTask {
async fn run(&mut self) {
let mut backup_lsn = Lsn(0);
let mut retry_attempt = 0u32;
// offload loop
// election loop
loop {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
}
let mut retry_attempt = 0u32;
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
retry_attempt = 0;
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
retry_attempt = 0;
info!("acquiring leadership");
if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
continue;
}
info!("acquired leadership");
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("failed to set wal_backup_lsn: {}", e);
// offload loop
loop {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
return;
}
retry_attempt = 0;
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) =
UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
if retry_attempt < u32::MAX {
retry_attempt += 1;
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
if let Some(l) = self.leader.as_mut() {
// Optimization idea for later:
// Avoid checking election leader every time by returning current lease grant expiration time
// Re-check leadership only after expiration time,
// such approach would reduce overhead on write-intensive workloads
match l
.check_am_i(
self.election.election_name.clone(),
self.election.candidate_name.clone(),
)
.await
{
Ok(leader) => {
if !leader {
info!("lost leadership");
break;
}
}
Err(e) => {
warn!("error validating leader, {:?}", e);
break;
}
}
}
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = min(retry_attempt + 1, u32::MAX);
}
}
}

View File

@@ -1,51 +0,0 @@
#!/bin/bash
# this is a shortcut script to avoid duplication in CI
set -eux -o pipefail
SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
COMPOSE_FILE=$SCRIPT_DIR/../docker-compose/docker-compose.yml
COMPUTE_CONTAINER_NAME=dockercompose_compute_1
SQL="CREATE TABLE t(key int primary key, value text); insert into t values(1,1); select * from t;"
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -c '$SQL' postgres"
cleanup() {
echo "show container information"
docker ps
docker-compose -f $COMPOSE_FILE logs
echo "stop containers..."
docker-compose -f $COMPOSE_FILE down
}
echo "clean up containers if exists"
cleanup
for pg_version in 14 15; do
echo "start containers (pg_version=$pg_version)."
PG_VERSION=$pg_version TAG=latest docker-compose -f $COMPOSE_FILE up --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
while sleep 1; do
# check timeout
cnt=`expr $cnt + 1`
if [ $cnt -gt 60 ]; then
echo "timeout before the compute is ready."
cleanup
exit 1
fi
# check if the compute is ready
set +o pipefail
result=`docker-compose -f $COMPOSE_FILE logs "compute_is_ready" | grep "accepting connections" | wc -l`
set -o pipefail
if [ $result -eq 1 ]; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
docker exec -it $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
cleanup
break
fi
done
done

View File

@@ -17,7 +17,6 @@ import uuid
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from enum import Flag, auto
from functools import cached_property
from pathlib import Path
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast
@@ -28,6 +27,7 @@ import jwt
import psycopg2
import pytest
import requests
from cached_property import cached_property
from fixtures.log_helper import log
from fixtures.types import Lsn, TenantId, TimelineId
@@ -970,7 +970,7 @@ class NeonPageserverApiException(Exception):
class NeonPageserverHttpClient(requests.Session):
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
def __init__(self, port: int, is_testing_enabled_or_skip, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token

View File

@@ -70,14 +70,18 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# But all others are broken
# First timeline would not get loaded into pageserver due to corrupt metadata file
with pytest.raises(Exception, match=f"Timeline {tenant1}/{timeline1} was not found") as err:
with pytest.raises(
Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}"
) as err:
pg1.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")
# Second timeline has no ancestors, only the metadata file and no layer files
# We don't have the remote storage enabled, which means timeline is in an incorrect state,
# it's not loaded at all
with pytest.raises(Exception, match=f"Timeline {tenant2}/{timeline2} was not found") as err:
with pytest.raises(
Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}"
) as err:
pg2.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")

View File

@@ -1,10 +1,10 @@
import os.path
import shutil
import subprocess
import threading
import time
from contextlib import closing
from cached_property import threading
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv

View File

@@ -1,267 +0,0 @@
import os
import re
import shutil
import subprocess
from pathlib import Path
from typing import Any, Dict, Union
import pytest
import toml
from fixtures.neon_fixtures import (
NeonCli,
NeonEnvBuilder,
NeonPageserverHttpClient,
PgBin,
PortDistributor,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.types import Lsn
from pytest import FixtureRequest
def dump_differs(first: Path, second: Path, output: Path) -> bool:
"""
Runs diff(1) command on two SQL dumps and write the output to the given output file.
Returns True if the dumps differ, False otherwise.
"""
with output.open("w") as stdout:
rv = subprocess.run(
[
"diff",
"--unified", # Make diff output more readable
"--ignore-matching-lines=^--", # Ignore changes in comments
"--ignore-blank-lines",
str(first),
str(second),
],
stdout=stdout,
)
return rv.returncode != 0
class PortReplacer(object):
"""
Class-helper for replacing ports in config files.
"""
def __init__(self, port_distributor: PortDistributor):
self.port_distributor = port_distributor
self.port_map: Dict[int, int] = {}
def replace_port(self, value: Union[int, str]) -> Union[int, str]:
if isinstance(value, int):
if (known_port := self.port_map.get(value)) is not None:
return known_port
self.port_map[value] = self.port_distributor.get_port()
return self.port_map[value]
if isinstance(value, str):
# Use regex to find port in a string
# urllib.parse.urlparse produces inconvenient results for cases without scheme like "localhost:5432"
# See https://bugs.python.org/issue27657
ports = re.findall(r":(\d+)(?:/|$)", value)
assert len(ports) == 1, f"can't find port in {value}"
port_int = int(ports[0])
if (known_port := self.port_map.get(port_int)) is not None:
return value.replace(f":{port_int}", f":{known_port}")
self.port_map[port_int] = self.port_distributor.get_port()
return value.replace(f":{port_int}", f":{self.port_map[port_int]}")
raise TypeError(f"unsupported type {type(value)} of {value=}")
def test_backward_compatibility(
pg_bin: PgBin, port_distributor: PortDistributor, test_output_dir: Path, request: FixtureRequest
):
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg14` path generateted by test_prepare_snapshot"
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
# Make compatibility snapshot artifacts pickupable by Allure
# by copying the snapshot directory to the curent test output directory.
repo_dir = test_output_dir / "compatibility_snapshot" / "repo"
shutil.copytree(compatibility_snapshot_dir / "repo", repo_dir)
# Remove old logs to avoid confusion in test artifacts
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
# Remove tenants data for computes
for tenant in (repo_dir / "pgdatadirs" / "tenants").glob("*"):
shutil.rmtree(tenant)
# Remove wal-redo temp directory
for tenant in (repo_dir / "tenants").glob("*"):
shutil.rmtree(tenant / "wal-redo-datadir.___temp")
# Update paths and ports in config files
pr = PortReplacer(port_distributor)
pageserver_toml = repo_dir / "pageserver.toml"
pageserver_config = toml.load(pageserver_toml)
new_local_path = pageserver_config["remote_storage"]["local_path"].replace(
"/test_prepare_snapshot/",
"/test_backward_compatibility/compatibility_snapshot/",
)
pageserver_config["remote_storage"]["local_path"] = new_local_path
pageserver_config["listen_http_addr"] = pr.replace_port(pageserver_config["listen_http_addr"])
pageserver_config["listen_pg_addr"] = pr.replace_port(pageserver_config["listen_pg_addr"])
pageserver_config["broker_endpoints"] = [
pr.replace_port(ep) for ep in pageserver_config["broker_endpoints"]
]
with pageserver_toml.open("w") as f:
toml.dump(pageserver_config, f)
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["etcd_broker"]["broker_endpoints"] = [
pr.replace_port(ep) for ep in snapshot_config["etcd_broker"]["broker_endpoints"]
]
snapshot_config["pageserver"]["listen_http_addr"] = pr.replace_port(
snapshot_config["pageserver"]["listen_http_addr"]
)
snapshot_config["pageserver"]["listen_pg_addr"] = pr.replace_port(
snapshot_config["pageserver"]["listen_pg_addr"]
)
for sk in snapshot_config["safekeepers"]:
sk["http_port"] = pr.replace_port(sk["http_port"])
sk["pg_port"] = pr.replace_port(sk["pg_port"])
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
# Ensure that snapshot doesn't contain references to the original path
rv = subprocess.run(
[
"grep",
"--recursive",
"--binary-file=without-match",
"--files-with-matches",
"test_prepare_snapshot/repo",
str(repo_dir),
],
capture_output=True,
text=True,
)
assert (
rv.returncode != 0
), f"there're files referencing `test_prepare_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
# NeonEnv stub to make NeonCli happy
config: Any = type("NeonEnvStub", (object,), {})
config.rust_log_override = None
config.repo_dir = repo_dir
config.pg_version = "14" # Note: `pg_dumpall` (from pg_bin) version is set by DEFAULT_PG_VERSION_DEFAULT and can be overriden by DEFAULT_PG_VERSION env var
config.initial_tenant = snapshot_config["default_tenant_id"]
# Check that we can start the project
cli = NeonCli(config)
try:
cli.raw_cli(["start"])
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
result = cli.pg_start("main")
request.addfinalizer(lambda: cli.pg_stop("main"))
except Exception:
breaking_changes_allowed = (
os.environ.get("ALLOW_BREAKING_CHANGES", "false").lower() == "true"
)
if breaking_changes_allowed:
pytest.xfail("Breaking changes are allowed by ALLOW_BREAKING_CHANGES env var")
else:
raise
connstr_all = re.findall(r"Starting postgres node at '([^']+)'", result.stdout)
assert len(connstr_all) == 1, f"can't parse connstr from {result.stdout}"
connstr = connstr_all[0]
# Check that the project produces the same dump as the previous version.
# The assert itself deferred to the end of the test
# to allow us to perform checks that change data before failing
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])
initial_dump_differs = dump_differs(
compatibility_snapshot_dir / "dump.sql",
test_output_dir / "dump.sql",
test_output_dir / "dump.filediff",
)
# Check that project can be recovered from WAL
# loosely based on https://github.com/neondatabase/cloud/wiki/Recovery-from-WAL
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_port = snapshot_config["pageserver"]["listen_http_addr"].split(":")[-1]
auth_token = snapshot_config["pageserver"]["auth_token"]
pageserver_http = NeonPageserverHttpClient(
port=pageserver_port,
is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled
auth_token=auth_token,
)
shutil.rmtree(repo_dir / "local_fs_remote_storage")
pageserver_http.timeline_delete(tenant_id, timeline_id)
pageserver_http.timeline_create(tenant_id, timeline_id)
pg_bin.run(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
)
# The assert itself deferred to the end of the test
# to allow us to perform checks that change data before failing
dump_from_wal_differs = dump_differs(
test_output_dir / "dump.sql",
test_output_dir / "dump-from-wal.sql",
test_output_dir / "dump-from-wal.filediff",
)
# Check that we can interract with the data
pg_bin.run(["pgbench", "--time=10", "--progress=2", connstr])
assert not dump_from_wal_differs, "dump from WAL differs"
assert not initial_dump_differs, "initial dump differs"
@pytest.mark.order(after="test_backward_compatibility")
# Note: if renaming this test, don't forget to update a reference to it in a workflow file:
# "Upload compatibility snapshot" step in .github/actions/run-python-test-set/action.yml
def test_prepare_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path):
# The test doesn't really test anything
# it creates a new snapshot for releases after we tested the current version against the previous snapshot in `test_backward_compatibility`.
#
# There's no cleanup here, it allows to adjust the data in `test_backward_compatibility` itself without re-collecting it.
neon_env_builder.pg_version = "14"
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
env.postgres.stop_all()
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
shutil.copytree(test_output_dir, test_output_dir / "compatibility_snapshot_pg14")
# Directory `test_output_dir / "compatibility_snapshot_pg14"` is uploaded to S3 in a workflow, keep the name in sync with it

View File

@@ -1,13 +1,14 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
from performance.test_perf_pgbench import get_scales_matrix
# Test gc_cutoff
# Test gc_cuttoff
#
# This test sets fail point at the end of GC, and checks that pageserver
# normally restarts after it. Also, there should be GC ERRORs in the log,
# but the fixture checks the log for any unexpected ERRORs after every
# test anyway, so it doesn't need any special attention here.
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# This test set fail point after at the end of GC and checks
# that pageserver normally restarts after it
@pytest.mark.parametrize("scale", get_scales_matrix(10))
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, scale: int):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
@@ -17,23 +18,21 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"gc_period": "10 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_period": "5 s",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "1 s",
"compaction_threshold": "3",
"image_creation_threshold": "2",
}
)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
connstr = pg.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
connstr = pg.connstr()
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
for i in range(5):
try:
pg_bin.run_capture(["pgbench", "-N", "-c5", "-T100", "-Mprepared", connstr])
pg_bin.run_capture(["pgbench", "-T100", connstr])
except Exception:
env.pageserver.stop()
env.pageserver.start()
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))

View File

@@ -65,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
# check 404
with pytest.raises(
NeonPageserverApiException,
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
match=f"Timeline {leaf_timeline_id} was not found for tenant {env.initial_tenant}",
):
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)