Merge branch 'main' into tenant-tasks-test

This commit is contained in:
Bojan Serafimov
2022-06-29 14:52:33 -04:00
31 changed files with 1886 additions and 1424 deletions

View File

@@ -1,6 +1,7 @@
[pageservers]
#zenith-us-stage-ps-1 console_region_id=27
zenith-us-stage-ps-2 console_region_id=27
zenith-us-stage-ps-3 console_region_id=27
[safekeepers]
zenith-us-stage-sk-4 console_region_id=27

View File

@@ -100,10 +100,8 @@ jobs:
name: Rust build << parameters.build_type >>
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS="--release --features profiling"
fi
@@ -112,7 +110,7 @@ jobs:
export RUSTC_WRAPPER=cachepot
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s
- save_cache:
@@ -128,32 +126,24 @@ jobs:
name: cargo test
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS=--release
fi
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
cargo test $CARGO_FLAGS
# Install the rust binaries, for use by test jobs
- run:
name: Install rust binaries
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
binaries=$(
"${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps |
cargo metadata --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
test_exe_paths=$(
"${cov_prefix[@]}" cargo test --message-format=json --no-run |
cargo test --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
@@ -166,34 +156,15 @@ jobs:
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/zenith/bin/$bin
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
# Install test executables (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/zenith/test_bin/$(basename $bin)
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
fi
# Install the postgres binaries, for use by test jobs
- run:
name: Install postgres binaries
command: |
cp -a tmp_install /tmp/zenith/pg_install
- run:
name: Merge coverage data
command: |
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
fi
# Save the rust binaries and coverage data for other jobs in this workflow.
# Save rust binaries for other jobs in the workflow
- persist_to_workspace:
root: /tmp/zenith
paths:
@@ -286,7 +257,7 @@ jobs:
# no_output_timeout, specified here.
no_output_timeout: 10m
environment:
- ZENITH_BIN: /tmp/zenith/bin
- NEON_BIN: /tmp/zenith/bin
- POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
- TEST_OUTPUT: /tmp/test_output
# this variable will be embedded in perf test report
@@ -314,12 +285,6 @@ jobs:
export GITHUB_SHA=$CIRCLE_SHA1
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
# Run the tests.
#
# The junit.xml file allows CircleCI to display more fine-grained test information
@@ -330,7 +295,7 @@ jobs:
# -n4 uses four processes to run tests via pytest-xdist
# -s is not used to prevent pytest from capturing output, because tests are running
# in parallel and logs are mixed between different tests
"${cov_prefix[@]}" ./scripts/pytest \
./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \
--verbose \
@@ -359,67 +324,12 @@ jobs:
# The store_test_results step tells CircleCI where to find the junit.xml file.
- store_test_results:
path: /tmp/test_output
- run:
name: Merge coverage data
command: |
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
fi
# Save coverage data (if any)
# Save data (if any)
- persist_to_workspace:
root: /tmp/zenith
paths:
- "*"
coverage-report:
executor: neon-xlarge-executor
steps:
- attach_workspace:
at: /tmp/zenith
- checkout
- restore_cache:
name: Restore rust cache
keys:
# Require an exact match. While an out of date cache might speed up the build,
# there's no way to clean out old packages, so the cache grows every time something
# changes.
- v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
- run:
name: Build coverage report
command: |
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
scripts/coverage \
--dir=/tmp/zenith/coverage report \
--input-objects=/tmp/zenith/etc/binaries.list \
--commit-url=$COMMIT_URL \
--format=github
- run:
name: Upload coverage report
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
REPORT_URL=https://neondatabase.github.io/zenith-coverage-data/$CIRCLE_SHA1
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
scripts/git-upload \
--repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/neondatabase/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE
# Add link to the coverage report to the commit
curl -f -X POST \
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"state\": \"success\",
\"context\": \"zenith-coverage\",
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
# Build neondatabase/neon:latest image and push it to Docker hub
docker-image:
docker:
@@ -688,50 +598,6 @@ jobs:
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait
# Trigger a new remote CI job
remote-ci-trigger:
docker:
- image: cimg/base:2021.04
parameters:
remote_repo:
type: string
environment:
REMOTE_REPO: << parameters.remote_repo >>
steps:
- run:
name: Set PR's status to pending
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
curl -f -X POST \
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
- run:
name: Request a remote CI test
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$CIRCLE_SHA1\",
\"remote_repo\": \"$LOCAL_REPO\"
}
}"
workflows:
build_and_test:
jobs:
@@ -774,12 +640,6 @@ workflows:
save_perf_report: true
requires:
- build-neon-release
- coverage-report:
# Context passes credentials for gh api
context: CI_ACCESS_TOKEN
requires:
# TODO: consider adding more
- other-tests-debug
- docker-image:
# Context gives an ability to login
context: Docker Hub
@@ -880,14 +740,3 @@ workflows:
- release
requires:
- docker-image-release
- remote-ci-trigger:
# Context passes credentials for gh api
context: CI_ACCESS_TOKEN
remote_repo: "neondatabase/cloud"
requires:
# XXX: Successful build doesn't mean everything is OK, but
# the job to be triggered takes so much time to complete (~22 min)
# that it's better not to wait for the commented-out steps
- build-neon-release
# - pg_regress-tests-release
# - other-tests-release

View File

@@ -2,25 +2,29 @@ name: 'Run python test'
description: 'Runs a Neon python test set, performing all the required preparations before'
inputs:
# Select the type of Rust build. Must be "release" or "debug".
build_type:
description: 'Type of Rust (neon) and C (postgres) builds. Must be "release" or "debug".'
required: true
rust_toolchain:
description: 'Rust toolchain version to fetch the caches'
required: true
# This parameter is required, to prevent the mistake of running all tests in one job.
test_selection:
description: 'A python test suite to run'
required: true
# Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr
extra_params:
description: 'Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr'
required: false
default: ''
needs_postgres_source:
description: 'Set to true if the test suite requires postgres source checked out'
required: false
default: 'false'
run_in_parallel:
description: 'Whether to run tests in parallel'
required: false
default: 'true'
save_perf_report:
description: 'Whether to upload the performance report'
required: false
default: 'false'
@@ -60,7 +64,7 @@ runs:
- name: Run pytest
env:
ZENITH_BIN: /tmp/neon/bin
NEON_BIN: /tmp/neon/bin
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
TEST_OUTPUT: /tmp/test_output
# this variable will be embedded in perf test report
@@ -88,7 +92,7 @@ runs:
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
cov_prefix=()
fi
@@ -117,3 +121,20 @@ runs:
scripts/generate_and_push_perf_report.sh
fi
fi
- name: Delete all data but logs
shell: bash -ex {0}
if: always()
run: |
du -sh /tmp/test_output/*
find /tmp/test_output -type f ! -name "*.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" ! -name "*.metrics" -delete
du -sh /tmp/test_output/*
- name: Upload python test logs
if: always()
uses: actions/upload-artifact@v3
with:
retention-days: 7
if-no-files-found: error
name: python-test-${{ inputs.test_selection }}-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-logs
path: /tmp/test_output/

View File

@@ -0,0 +1,17 @@
name: 'Merge and upload coverage data'
description: 'Compresses and uploads the coverage data as an artifact'
runs:
using: "composite"
steps:
- name: Merge coverage data
shell: bash -ex {0}
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Upload coverage data
uses: actions/upload-artifact@v3
with:
retention-days: 7
if-no-files-found: error
name: coverage-data-artifact
path: /tmp/coverage/

View File

@@ -1,13 +1,28 @@
name: build_and_test
on: [ push ]
name: Test
on:
push:
branches:
- main
pull_request:
defaults:
run:
shell: bash -ex {0}
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
build-postgres:
runs-on: [ self-hosted, Linux, k8s-runner ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -34,7 +49,7 @@ jobs:
- name: Build postgres
if: steps.cache_pg.outputs.cache-hit != 'true'
run: COPT='-Werror' mold -run make postgres -j$(nproc)
run: mold -run make postgres -j$(nproc)
# actions/cache@v3 does not allow concurrently using the same cache across job steps, so use a separate cache
- name: Prepare postgres artifact
@@ -52,6 +67,7 @@ jobs:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-postgres ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -85,44 +101,39 @@ jobs:
~/.cargo/registry/
~/.cargo/git/
target/
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
- name: Run cargo build
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS="--release --features profiling"
fi
export CACHEPOT_BUCKET=zenith-rust-cachepot
export RUSTC_WRAPPER=cachepot
export AWS_ACCESS_KEY_ID="${{ secrets.AWS_ACCESS_KEY_ID }}"
export AWS_SECRET_ACCESS_KEY="${{ secrets.AWS_SECRET_ACCESS_KEY }}"
export HOME=/home/runner
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s
- name: Run cargo test
run: |
export HOME=/home/runner
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS=--release
fi
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
- name: Install rust binaries
run: |
export HOME=/home/runner
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
@@ -137,39 +148,36 @@ jobs:
jq -r '.executable | select(. != null)'
)
mkdir -p /tmp/neon/bin
mkdir -p /tmp/neon/test_bin
mkdir -p /tmp/neon/etc
mkdir -p /tmp/neon/bin/
mkdir -p /tmp/neon/test_bin/
mkdir -p /tmp/neon/etc/
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
# Install target binaries
for bin in $binaries; do
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/neon/bin/$bin
cp $SRC $DST
echo $DST >> /tmp/neon/etc/binaries.list
cp "$SRC" "$DST"
done
# Install test executables (for code coverage)
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
for bin in $binaries; do
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
done
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/neon/test_bin/$(basename $bin)
cp $SRC $DST
echo $DST >> /tmp/neon/etc/binaries.list
cp "$SRC" "$DST"
echo "$DST" >> /tmp/coverage/binaries.list
done
fi
- name: Install postgres binaries
run: cp -a tmp_install /tmp/neon/pg_install
- name: Merge coverage data
run: |
export HOME=/home/runner
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage merge
fi
- name: Prepare neon artifact
run: tar -C /tmp/neon/ -czf ./neon.tgz .
@@ -181,38 +189,17 @@ jobs:
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon.tgz
check-codestyle-python:
runs-on: [ self-hosted, Linux, k8s-runner ]
strategy:
matrix:
rust_toolchain: [ 1.58 ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
# XXX: keep this after the binaries.list is formed, so the coverage can properly work later
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
- name: Run yapf to ensure code format
run: poetry run yapf --recursive --diff .
- name: Run mypy to check types
run: poetry run mypy .
pg_regress-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -231,10 +218,15 @@ jobs:
test_selection: batch_pg_regress
needs_postgres_source: true
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
other-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
rust_toolchain: [ 1.58 ]
@@ -252,10 +244,15 @@ jobs:
rust_toolchain: ${{ matrix.rust_toolchain }}
test_selection: batch_others
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
benchmarks:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
strategy:
fail-fast: false
matrix:
build_type: [ release ]
rust_toolchain: [ 1.58 ]
@@ -273,4 +270,120 @@ jobs:
rust_toolchain: ${{ matrix.rust_toolchain }}
test_selection: performance
run_in_parallel: false
# save_perf_report: true
save_perf_report: true
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
coverage-report:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ other-tests, pg_regress-tests ]
strategy:
fail-fast: false
matrix:
build_type: [ debug ]
rust_toolchain: [ 1.58 ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Restore cargo deps cache
id: cache_cargo
uses: actions/cache@v3
with:
path: |
~/.cargo/registry/
~/.cargo/git/
target/
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
with:
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon-artifact/
- name: Extract Neon artifact
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/
rm -rf ./neon-artifact/
- name: Restore coverage data
uses: actions/download-artifact@v3
with:
name: coverage-data-artifact
path: /tmp/coverage/
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Build and upload coverage report
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=$COMMIT_URL \
--format=github
REPORT_URL=https://${{ github.repository_owner }}.github.io/zenith-coverage-data/$COMMIT_SHA
scripts/git-upload \
--repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
# Add link to the coverage report to the commit
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"success\",
\"context\": \"neon-coverage\",
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
trigger-e2e-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\"
}
}"

View File

@@ -1,4 +1,4 @@
name: Build and Test
name: Check code style and build
on:
push:
@@ -6,15 +6,27 @@ on:
- main
pull_request:
defaults:
run:
shell: bash -ex {0}
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
jobs:
regression-check:
check-codestyle-rust:
strategy:
fail-fast: false
matrix:
# If we want to duplicate this job for different
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
rust_toolchain: [1.58]
os: [ubuntu-latest, macos-latest]
timeout-minutes: 30
timeout-minutes: 50
name: run regression test suite
runs-on: ${{ matrix.os }}
@@ -92,5 +104,30 @@ jobs:
- name: Run cargo clippy
run: ./run_clippy.sh
- name: Run cargo test
run: cargo test --all --all-targets
- name: Ensure all project builds
run: cargo build --all --all-targets
check-codestyle-python:
runs-on: [ self-hosted, Linux, k8s-runner ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: false
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
- name: Run yapf to ensure code format
run: poetry run yapf --recursive --diff .
- name: Run mypy to check types
run: poetry run mypy .

7
Cargo.lock generated
View File

@@ -461,6 +461,7 @@ dependencies = [
"tar",
"tokio",
"tokio-postgres",
"urlencoding",
"workspace_hack",
]
@@ -3684,6 +3685,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "urlencoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821"
[[package]]
name = "utils"
version = "0.1.0"

View File

@@ -1,5 +1,5 @@
# Build Postgres
FROM zimg/rust:1.58 AS pg-build
FROM neondatabase/rust:1.58 AS pg-build
WORKDIR /pg
USER root
@@ -14,7 +14,7 @@ RUN set -e \
&& tar -C tmp_install -czf /postgres_install.tar.gz .
# Build zenith binaries
FROM zimg/rust:1.58 AS build
FROM neondatabase/rust:1.58 AS build
ARG GIT_VERSION=local
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
@@ -46,9 +46,9 @@ RUN set -e \
&& useradd -d /data zenith \
&& chown -R zenith:zenith /data
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/circleci/project/target/release/proxy /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/proxy /usr/local/bin
COPY --from=pg-build /pg/tmp_install/ /usr/local/
COPY --from=pg-build /postgres_install.tar.gz /data/

View File

@@ -1,6 +1,6 @@
# First transient image to build compute_tools binaries
# NB: keep in sync with rust image version in .circle/config.yml
FROM zimg/rust:1.58 AS rust-build
FROM neondatabase/rust:1.58 AS rust-build
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
@@ -15,4 +15,4 @@ RUN set -e \
# Final image that only has one binary
FROM debian:buster-slim
COPY --from=rust-build /home/circleci/project/target/release/compute_ctl /usr/local/bin/compute_ctl
COPY --from=rust-build /home/runner/target/release/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -53,7 +53,7 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
1. Install XCode and dependencies
```
xcode-select --install
brew install protobuf etcd
brew install protobuf etcd openssl
```
2. [Install Rust](https://www.rust-lang.org/tools/install)

View File

@@ -18,4 +18,5 @@ serde_json = "1"
tar = "0.4"
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
urlencoding = "2.1.0"
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -289,6 +289,7 @@ impl ComputeNode {
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(self, &mut client)?;
handle_grants(&self.spec, &mut client)?;
create_writablity_check_data(&mut client)?;

View File

@@ -2,9 +2,11 @@ use std::path::Path;
use anyhow::Result;
use log::{info, log_enabled, warn, Level};
use postgres::Client;
use postgres::{Client, NoTls};
use serde::Deserialize;
use urlencoding::encode;
use crate::compute::ComputeNode;
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
@@ -97,18 +99,13 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
// Process delta operations first
if let Some(ops) = &spec.delta_operations {
info!("processing delta operations on roles");
info!("processing role renames");
for op in ops {
match op.action.as_ref() {
// We do not check either role exists or not,
// Postgres will take care of it for us
"delete_role" => {
let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote());
warn!("deleting role '{}'", &op.name);
xact.execute(query.as_str(), &[])?;
// no-op now, roles will be deleted at the end of configuration
}
// Renaming role drops its password, since tole name is
// Renaming role drops its password, since role name is
// used as a salt there. It is important that this role
// is recorded with a new `name` in the `roles` list.
// Follow up roles update will set the new password.
@@ -182,7 +179,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
xact.execute(query.as_str(), &[])?;
let grant_query = format!(
"grant pg_read_all_data, pg_write_all_data to {}",
"GRANT pg_read_all_data, pg_write_all_data TO {}",
name.quote()
);
xact.execute(grant_query.as_str(), &[])?;
@@ -197,6 +194,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
Ok(())
}
/// Reassign all dependent objects and delete requested roles.
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
let spec = &node.spec;
// First, reassign all dependent objects to db owners.
if let Some(ops) = &spec.delta_operations {
info!("reassigning dependent objects of to-be-deleted roles");
for op in ops {
if op.action == "delete_role" {
reassign_owned_objects(node, &op.name)?;
}
}
}
// Second, proceed with role deletions.
let mut xact = client.transaction()?;
if let Some(ops) = &spec.delta_operations {
info!("processing role deletions");
for op in ops {
// We do not check either role exists or not,
// Postgres will take care of it for us
if op.action == "delete_role" {
let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote());
warn!("deleting role '{}'", &op.name);
xact.execute(query.as_str(), &[])?;
}
}
}
Ok(())
}
// Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
for db in &node.spec.cluster.databases {
if db.owner != *role_name {
let db_name_encoded = format!("/{}", encode(&db.name));
let db_connstr = node.connstr.replacen("/postgres", &db_name_encoded, 1);
let mut client = Client::connect(&db_connstr, NoTls)?;
// This will reassign all dependent objects to the db owner
let reassign_query = format!(
"REASSIGN OWNED BY {} TO {}",
role_name.quote(),
db.owner.quote()
);
info!(
"reassigning objects owned by '{}' in db '{}' to '{}'",
role_name, &db.name, &db.owner
);
client.simple_query(&reassign_query)?;
// This now will only drop privileges of the role
let drop_query = format!("DROP OWNED BY {}", role_name.quote());
client.simple_query(&drop_query)?;
}
}
Ok(())
}
/// It follows mostly the same logic as `handle_roles()` excepting that we
/// does not use an explicit transactions block, since major database operations
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
@@ -294,13 +353,26 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
pub fn handle_grants(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
// via the web interface and proxy link auth. And also we grant a
// read / write all data privilege to every role. So also grant
// create to everyone.
// XXX: later we should stop messing with Postgres ACL in such horrible
// ways.
let roles = spec
.cluster
.roles
.iter()
.map(|r| r.name.quote())
.collect::<Vec<_>>();
for db in &spec.cluster.databases {
let dbname = &db.name;
let query: String = format!(
"GRANT CREATE ON DATABASE {} TO {}",
dbname.quote(),
db.owner.quote()
roles.join(", ")
);
info!("grant query {}", &query);

View File

@@ -6,17 +6,13 @@ pub mod subscription_key;
/// All broker values, possible to use when dealing with etcd.
pub mod subscription_value;
use std::{
collections::{hash_map, HashMap},
str::FromStr,
};
use std::str::FromStr;
use serde::de::DeserializeOwned;
use subscription_key::SubscriptionKey;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use utils::zid::{NodeId, ZTenantTimelineId};
use crate::subscription_key::SubscriptionFullKey;
@@ -28,18 +24,17 @@ pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
/// A way to control the data retrieval from a certain subscription.
pub struct BrokerSubscription<V> {
value_updates: mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>>,
/// An unbounded channel to fetch the relevant etcd updates from.
pub value_updates: mpsc::UnboundedReceiver<BrokerUpdate<V>>,
key: SubscriptionKey,
watcher_handle: JoinHandle<Result<(), BrokerError>>,
/// A subscription task handle, to allow waiting on it for the task to complete.
/// Both the updates channel and the handle require `&mut`, so it's better to keep
/// both `pub` to allow using both in the same structures without borrow checker complaining.
pub watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher: Watcher,
}
impl<V> BrokerSubscription<V> {
/// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet.
pub async fn fetch_data(&mut self) -> Option<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>> {
self.value_updates.recv().await
}
/// Cancels the subscription, stopping the data poller and waiting for it to shut down.
pub async fn cancel(mut self) -> Result<(), BrokerError> {
self.watcher.cancel().await.map_err(|e| {
@@ -48,15 +43,41 @@ impl<V> BrokerSubscription<V> {
format!("Failed to cancel broker subscription, kind: {:?}", self.key),
)
})?;
self.watcher_handle.await.map_err(|e| {
BrokerError::InternalError(format!(
"Failed to join the broker value updates task, kind: {:?}, error: {e}",
self.key
))
})?
match (&mut self.watcher_handle).await {
Ok(res) => res,
Err(e) => {
if e.is_cancelled() {
// don't error on the tasks that are cancelled already
Ok(())
} else {
Err(BrokerError::InternalError(format!(
"Panicked during broker subscription task, kind: {:?}, error: {e}",
self.key
)))
}
}
}
}
}
impl<V> Drop for BrokerSubscription<V> {
fn drop(&mut self) {
// we poll data from etcd into the channel in the same struct, so if the whole struct gets dropped,
// no more data is used by the receiver and it's safe to cancel and drop the whole etcd subscription task.
self.watcher_handle.abort();
}
}
/// An update from the etcd broker.
pub struct BrokerUpdate<V> {
/// Etcd generation version, the bigger the more actual the data is.
pub etcd_version: i64,
/// Etcd key for the corresponding value, parsed from the broker KV.
pub key: SubscriptionFullKey,
/// Current etcd value, parsed from the broker KV.
pub value: V,
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
@@ -124,41 +145,21 @@ where
break;
}
let mut value_updates: HashMap<ZTenantTimelineId, HashMap<NodeId, V>> = HashMap::new();
// Keep track that the timeline data updates from etcd arrive in the right order.
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
let mut value_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
let events = resp.events();
debug!("Processing {} events", events.len());
for event in events {
if EventType::Put == event.event_type() {
if let Some(new_etcd_kv) = event.kv() {
let new_kv_version = new_etcd_kv.version();
match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) {
Ok(Some((key, value))) => match value_updates
.entry(key.id)
.or_default()
.entry(key.node_id)
{
hash_map::Entry::Occupied(mut o) => {
let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN);
if old_etcd_kv_version < new_kv_version {
o.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
} else {
debug!("Skipping etcd timeline update due to older version compared to one that's already stored");
}
}
hash_map::Entry::Vacant(v) => {
v.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
}
},
Ok(Some((key, value))) => if let Err(e) = value_updates_sender.send(BrokerUpdate {
etcd_version: new_etcd_kv.version(),
key,
value,
}) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
},
Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"),
Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"),
Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"),
@@ -166,13 +167,6 @@ where
}
}
}
if !value_updates.is_empty() {
if let Err(e) = value_updates_sender.send(value_updates) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
}
}
}
Ok(())

View File

@@ -733,17 +733,10 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let all_rels = timeline.list_rels(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let mut total_blocks: i64 = 0;
let total_blocks =
timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
for rel in all_rels {
if rel.forknum == 0 {
let n_blocks = timeline.get_rel_size(rel, lsn).unwrap_or(0);
total_blocks += n_blocks as i64;
}
}
let db_size = total_blocks * pg_constants::BLCKSZ as i64;
let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64;
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
db_size,

View File

@@ -123,6 +123,19 @@ impl<R: Repository> DatadirTimeline<R> {
self.tline.get(key, lsn)
}
// Get size of a database in blocks
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
let mut total_blocks = 0;
let rels = self.list_rels(spcnode, dbnode, lsn)?;
for rel in rels {
let n_blocks = self.get_rel_size(rel, lsn)?;
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
}
/// Get size of a relation file
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
ensure!(tag.relnode != 0, "invalid relnode");
@@ -667,6 +680,10 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
}
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> {
let req_lsn = self.tline.get_last_record_lsn();
let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn)?;
// Remove entry from dbdir
let buf = self.get(DBDIR_KEY)?;
let mut dir = DbDirectory::des(&buf)?;
@@ -680,7 +697,8 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
);
}
// FIXME: update pending_nblocks
// Update logical database size.
self.pending_nblocks -= total_blocks as isize;
// Delete all relations and metadata files for the spcnode/dnode
self.delete(dbdir_key_range(spcnode, dbnode));

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
//! Actual Postgres connection handler to stream WAL to the server.
//! Runs as a separate, cancellable Tokio task.
use std::{
str::FromStr,
sync::Arc,
@@ -10,113 +10,29 @@ use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use fail::fail_point;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantTimelineId},
};
use super::TaskEvent;
use crate::{
http::models::WalReceiverEntry,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ReplicationFeedback),
End(Result<(), String>),
}
/// A wrapper around standalone Tokio task, to poll its updates or cancel the task.
#[derive(Debug)]
pub struct WalReceiverConnection {
handle: tokio::task::JoinHandle<()>,
cancellation: watch::Sender<()>,
events_receiver: watch::Receiver<WalConnectionEvent>,
}
impl WalReceiverConnection {
/// Initializes the connection task, returning a set of handles on top of it.
/// The task is started immediately after the creation, fails if no connection is established during the timeout given.
pub fn open(
id: ZTenantTimelineId,
safekeeper_id: NodeId,
wal_producer_connstr: String,
connect_timeout: Duration,
) -> Self {
let (cancellation, mut cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started);
let handle = tokio::spawn(
async move {
let connection_result = handle_walreceiver_connection(
id,
&wal_producer_connstr,
&events_sender,
&mut cancellation_receiver,
connect_timeout,
)
.await
.map_err(|e| {
format!("Walreceiver connection for id {id} failed with error: {e:#}")
});
match &connection_result {
Ok(()) => {
debug!("Walreceiver connection for id {id} ended successfully")
}
Err(e) => warn!("{e}"),
}
events_sender
.send(WalConnectionEvent::End(connection_result))
.ok();
}
.instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)),
);
Self {
handle,
cancellation,
events_receiver,
}
}
/// Polls for the next WAL receiver event, if there's any available since the last check.
/// Blocks if there's no new event available, returns `None` if no new events will ever occur.
/// Only the last event is returned, all events received between observatins are lost.
pub async fn next_event(&mut self) -> Option<WalConnectionEvent> {
match self.events_receiver.changed().await {
Ok(()) => Some(self.events_receiver.borrow().clone()),
Err(_cancellation_error) => None,
}
}
/// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed.
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
self.cancellation.send(()).ok();
let handle = &mut self.handle;
handle
.await
.context("Failed to join on a walreceiver connection task")?;
Ok(())
}
}
async fn handle_walreceiver_connection(
/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming.
pub async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_producer_connstr: &str,
events_sender: &watch::Sender<WalConnectionEvent>,
cancellation: &mut watch::Receiver<()>,
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
// Connect to the database in replication mode.
@@ -214,8 +130,6 @@ async fn handle_walreceiver_connection(
while let Some(replication_message) = {
select! {
// check for shutdown first
biased;
_ = cancellation.changed() => {
info!("walreceiver interrupted");
None
@@ -344,7 +258,7 @@ async fn handle_walreceiver_connection(
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) {
if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}

View File

@@ -49,6 +49,12 @@ impl UserFacingError for ConsoleAuthError {
}
}
impl From<&auth::credentials::ClientCredsParseError> for ConsoleAuthError {
fn from(e: &auth::credentials::ClientCredsParseError) -> Self {
ConsoleAuthError::BadProjectName(e.clone())
}
}
// TODO: convert into an enum with "error"
#[derive(Serialize, Deserialize, Debug)]
struct GetRoleSecretResponse {
@@ -94,7 +100,7 @@ impl<'a> Api<'a> {
let mut url = self.endpoint.clone();
url.path_segments_mut().push("proxy_get_role_secret");
url.query_pairs_mut()
.append_pair("project", &self.creds.project_name)
.append_pair("project", self.creds.project_name.as_ref()?)
.append_pair("role", &self.creds.user);
// TODO: use a proper logger
@@ -117,8 +123,8 @@ impl<'a> Api<'a> {
async fn wake_compute(&self) -> Result<DatabaseInfo> {
let mut url = self.endpoint.clone();
url.path_segments_mut().push("proxy_wake_compute");
url.query_pairs_mut()
.append_pair("project", &self.creds.project_name);
let project_name = self.creds.project_name.as_ref()?;
url.query_pairs_mut().append_pair("project", project_name);
// TODO: use a proper logger
println!("cplane request: {url}");

View File

@@ -8,7 +8,7 @@ use std::collections::HashMap;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub enum ClientCredsParseError {
#[error("Parameter `{0}` is missing in startup packet.")]
MissingKey(&'static str),
@@ -44,7 +44,7 @@ impl UserFacingError for ClientCredsParseError {}
pub struct ClientCredentials {
pub user: String,
pub dbname: String,
pub project_name: String,
pub project_name: Result<String, ClientCredsParseError>,
}
impl ClientCredentials {
@@ -67,7 +67,7 @@ impl ClientCredentials {
let user = get_param("user")?;
let dbname = get_param("database")?;
let project_name = get_param("project").ok();
let project_name = get_project_name(sni_data, common_name, project_name.as_deref())?;
let project_name = get_project_name(sni_data, common_name, project_name.as_deref());
Ok(Self {
user,

View File

@@ -5,6 +5,11 @@ use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use etcd_broker::subscription_value::SkTimelineInfo;
use etcd_broker::LeaseKeepAliveStream;
use etcd_broker::LeaseKeeper;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
@@ -21,7 +26,7 @@ use utils::zid::{NodeId, ZTenantTimelineId};
const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
const LEASE_TTL_SEC: i64 = 5;
const LEASE_TTL_SEC: i64 = 10;
pub fn thread_main(conf: SafeKeeperConf) {
let runtime = runtime::Builder::new_current_thread()
@@ -154,13 +159,48 @@ pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{system_id}")
}
async fn push_sk_info(
zttid: ZTenantTimelineId,
mut client: Client,
key: String,
sk_info: SkTimelineInfo,
mut lease: Lease,
) -> anyhow::Result<(ZTenantTimelineId, Lease)> {
let put_opts = PutOptions::new().with_lease(lease.id);
client
.put(
key.clone(),
serde_json::to_string(&sk_info)?,
Some(put_opts),
)
.await
.with_context(|| format!("failed to push safekeeper info to {}", key))?;
// revive the lease
lease
.keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
lease
.ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
Ok((zttid, lease))
}
struct Lease {
id: i64,
keeper: LeaseKeeper,
ka_stream: LeaseKeepAliveStream,
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
// Get and maintain lease to automatically delete obsolete data
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?;
let mut leases: HashMap<ZTenantTimelineId, Lease> = HashMap::new();
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
loop {
@@ -168,33 +208,46 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
for zttid in GlobalTimelines::get_active_timelines() {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
let sk_info = tli.get_public_info(&conf)?;
let put_opts = PutOptions::new().with_lease(lease.id());
client
.put(
timeline_safekeeper_path(
conf.broker_etcd_prefix.clone(),
zttid,
conf.my_id,
),
serde_json::to_string(&sk_info)?,
Some(put_opts),
)
.await
.context("failed to push safekeeper info")?;
let active_tlis = GlobalTimelines::get_active_timelines();
// // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data.
for zttid in active_tlis.iter() {
if let Entry::Vacant(v) = leases.entry(*zttid) {
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?;
v.insert(Lease {
id: lease.id(),
keeper,
ka_stream,
});
}
}
// revive the lease
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
leases.retain(|zttid, _| active_tlis.contains(zttid));
// Push data concurrently to not suffer from latency, with many timelines it can be slow.
let handles = active_tlis
.iter()
.filter_map(|zttid| GlobalTimelines::get_loaded(*zttid))
.map(|tli| {
let sk_info = tli.get_public_info(&conf);
let key = timeline_safekeeper_path(
conf.broker_etcd_prefix.clone(),
tli.zttid,
conf.my_id,
);
let lease = leases.remove(&tli.zttid).unwrap();
tokio::spawn(push_sk_info(tli.zttid, client.clone(), key, sk_info, lease))
})
.collect::<Vec<_>>();
for h in handles {
let (zttid, lease) = h.await??;
// It is ugly to pull leases from hash and then put it back, but
// otherwise we have to resort to long living per tli tasks (which
// would generate a lot of errors when etcd is down) as task wants to
// have 'static objects, we can't borrow to it.
leases.insert(zttid, lease);
}
sleep(push_interval).await;
}
}
@@ -221,15 +274,12 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
.await
.context("failed to subscribe for safekeeper info")?;
loop {
match subscription.fetch_data().await {
match subscription.value_updates.recv().await {
Some(new_info) => {
for (zttid, sk_info) in new_info {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
for (safekeeper_id, info) in sk_info {
tli.record_safekeeper_info(&info, safekeeper_id).await?
}
}
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) {
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}
}
None => {

View File

@@ -239,6 +239,19 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
if oldstate.timeline_start_lsn != Lsn(0) {
return Ok(oldstate);
}
// set special timeline_start_lsn because we don't know the real one
info!("setting timeline_start_lsn and local_start_lsn to Lsn(1)");
oldstate.timeline_start_lsn = Lsn(1);
oldstate.local_start_lsn = Lsn(1);
return Ok(oldstate);
}
bail!("unsupported safekeeper control file version {}", version)
}

View File

@@ -28,7 +28,7 @@ use utils::{
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 5;
pub const SK_FORMAT_VERSION: u32 = 6;
const SK_PROTOCOL_VERSION: u32 = 2;
const UNKNOWN_SERVER_VERSION: u32 = 0;

View File

@@ -11,7 +11,7 @@ use serde::Serialize;
use tokio::sync::watch;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs::{self};
use std::sync::{Arc, Mutex, MutexGuard};
@@ -445,9 +445,9 @@ impl Timeline {
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result<SkTimelineInfo> {
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
let shared_state = self.mutex.lock().unwrap();
Ok(SkTimelineInfo {
SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
@@ -460,7 +460,7 @@ impl Timeline {
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
})
}
}
/// Update timeline state with peer safekeeper data.
@@ -625,6 +625,8 @@ impl GlobalTimelines {
zttid: ZTenantTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("", timeline = %zttid.tenant_id).entered();
let mut state = TIMELINES_STATE.lock().unwrap();
match state.timelines.get(&zttid) {
@@ -667,7 +669,7 @@ impl GlobalTimelines {
}
/// Get ZTenantTimelineIDs of all active timelines.
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
pub fn get_active_timelines() -> HashSet<ZTenantTimelineId> {
let state = TIMELINES_STATE.lock().unwrap();
state
.timelines

View File

@@ -45,7 +45,7 @@ If you want to run all tests that have the string "bench" in their names:
Useful environment variables:
`ZENITH_BIN`: The directory where zenith binaries can be found.
`NEON_BIN`: The directory where neon binaries can be found.
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.

View File

@@ -35,9 +35,14 @@ def test_createdb(neon_simple_env: NeonEnv):
with closing(db.connect(dbname='foodb')) as conn:
with conn.cursor() as cur:
# Check database size in both branches
cur.execute(
'select pg_size_pretty(pg_database_size(%s)), pg_size_pretty(sum(pg_relation_size(oid))) from pg_class where relisshared is false;',
('foodb', ))
cur.execute("""
select pg_size_pretty(pg_database_size('foodb')),
pg_size_pretty(
sum(pg_relation_size(oid, 'main'))
+sum(pg_relation_size(oid, 'vm'))
+sum(pg_relation_size(oid, 'fsm'))
) FROM pg_class where relisshared is false
""")
res = cur.fetchone()
# check that dbsize equals sum of all relation sizes, excluding shared ones
# This is how we define dbsize in neon for now

View File

@@ -1,5 +1,5 @@
# It's possible to run any regular test with the local fs remote storage via
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" poetry ......
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import shutil, os
from contextlib import closing

View File

@@ -8,7 +8,6 @@ import time
def test_timeline_size(neon_simple_env: NeonEnv):
env = neon_simple_env
# Branch at the point where only 100 rows were inserted
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
@@ -23,7 +22,6 @@ def test_timeline_size(neon_simple_env: NeonEnv):
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
# Create table, and insert the first 100 rows
cur.execute("CREATE TABLE foo (t text)")
cur.execute("""
INSERT INTO foo
@@ -43,6 +41,51 @@ def test_timeline_size(neon_simple_env: NeonEnv):
"current_logical_size_non_incremental"]
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
timeline_details = assert_local(client, env.initial_tenant, new_timeline_id)
assert timeline_details['local']['current_logical_size'] == timeline_details['local'][
'current_logical_size_non_incremental']
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute('CREATE DATABASE foodb')
with closing(pgmain.connect(dbname='foodb')) as conn:
with conn.cursor() as cur2:
cur2.execute("CREATE TABLE foo (t text)")
cur2.execute("""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 10) g
""")
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute('DROP DATABASE foodb')
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
# wait until received_lsn_lag is 0
def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60):
started_at = time.time()

View File

@@ -50,7 +50,7 @@ A fixture is created with the decorator @pytest.fixture decorator.
See docs: https://docs.pytest.org/en/6.2.x/fixture.html
There are several environment variables that can control the running of tests:
ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
NEON_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
There's no need to import this file to use it. It should be declared as a plugin
inside conftest.py, and that makes it available to all tests.
@@ -151,7 +151,7 @@ def pytest_configure(config):
return
# Find the neon binaries.
global neon_binpath
env_neon_bin = os.environ.get('ZENITH_BIN')
env_neon_bin = os.environ.get('NEON_BIN')
if env_neon_bin:
neon_binpath = env_neon_bin
else:

View File

@@ -80,6 +80,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
thread.join()
@pytest.mark.timeout(1000)
@pytest.mark.parametrize("n_tables", [5])
@pytest.mark.parametrize("scale", get_scales_matrix(5))
@pytest.mark.parametrize("num_iters", [10])
@@ -121,6 +122,7 @@ def start_pgbench_simple_update_workload(env: PgCompare, duration: int):
env.flush()
@pytest.mark.timeout(1000)
@pytest.mark.parametrize("scale", get_scales_matrix(100))
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_simple_update_workload(pg_compare: PgCompare, scale: int, duration: int):
@@ -158,6 +160,7 @@ def start_pgbench_intensive_initialization(env: PgCompare, scale: int):
])
@pytest.mark.timeout(1000)
@pytest.mark.parametrize("scale", get_scales_matrix(1000))
def test_pgbench_intensive_init_workload(pg_compare: PgCompare, scale: int):
env = pg_compare