mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 00:50:36 +00:00
Compare commits
30 Commits
rc/proxy/2
...
bayandin/1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e7905979d | ||
|
|
0b3aedb830 | ||
|
|
93d3d98d2a | ||
|
|
944cac950d | ||
|
|
e1c032fb3c | ||
|
|
c861d71eeb | ||
|
|
6e46204712 | ||
|
|
5c6d78d469 | ||
|
|
3fd77eb0d4 | ||
|
|
3114be034a | ||
|
|
8dc7dc79dd | ||
|
|
fad9be4598 | ||
|
|
20d0939b00 | ||
|
|
ea0d35f3ca | ||
|
|
e34059cd18 | ||
|
|
d999c46692 | ||
|
|
82853cc1d1 | ||
|
|
1efaa16260 | ||
|
|
4dbb74b559 | ||
|
|
5ab10d051d | ||
|
|
f8bdce1015 | ||
|
|
7ba50708e3 | ||
|
|
e9e77ee744 | ||
|
|
ee93700a0f | ||
|
|
502b69b33b | ||
|
|
76ab57f33f | ||
|
|
5984edaecd | ||
|
|
3eb83a0ebb | ||
|
|
4d426f6fbe | ||
|
|
d04af08567 |
479
.github/workflows/benchmarking.yml
vendored
479
.github/workflows/benchmarking.yml
vendored
@@ -49,76 +49,6 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
bench:
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: "neon-staging"
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Create Neon Project
|
||||
id: create-neon-project
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
|
||||
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Run benchmark
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
# Set --sparse-ordering option of pytest-order plugin
|
||||
# to ensure tests are running in order of appears in the file.
|
||||
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
|
||||
- name: Delete Neon Project
|
||||
if: ${{ always() }}
|
||||
uses: ./.github/actions/neon-project-delete
|
||||
with:
|
||||
project_id: ${{ steps.create-neon-project.outputs.project_id }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
generate-matrices:
|
||||
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
|
||||
#
|
||||
@@ -135,72 +65,31 @@ jobs:
|
||||
pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }}
|
||||
olap-compare-matrix: ${{ steps.olap-compare-matrix.outputs.matrix }}
|
||||
tpch-compare-matrix: ${{ steps.tpch-compare-matrix.outputs.matrix }}
|
||||
pgbench-compare-big-db-matrix: ${{ steps.pgbench-compare-big-db-matrix.outputs.matrix }}
|
||||
|
||||
steps:
|
||||
- name: Generate matrix for pgbench benchmark
|
||||
id: pgbench-compare-matrix
|
||||
- name: Generate matrix for pgbench benchmark with big databases
|
||||
id: pgbench-compare-big-db-matrix
|
||||
run: |
|
||||
matrix='{
|
||||
"platform": [
|
||||
"neon-captest-new",
|
||||
"neon-captest-reuse",
|
||||
"neonvm-captest-new"
|
||||
],
|
||||
"db_size": [ "10gb" ],
|
||||
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neon-captest-new", "db_size": "50gb" },
|
||||
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neonvm-captest-new", "db_size": "50gb" }]
|
||||
}'
|
||||
# There's also `neonvm-captest-reuse` platform, but we don't want to use it
|
||||
# because DevRel team used it for some demos, they might need it again
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "db_size": "10gb"},
|
||||
{ "platform": "rds-aurora", "db_size": "50gb"}]')
|
||||
fi
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Generate matrix for OLAP benchmarks
|
||||
id: olap-compare-matrix
|
||||
run: |
|
||||
matrix='{
|
||||
"platform": [
|
||||
"neon-captest-reuse"
|
||||
]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres" },
|
||||
{ "platform": "rds-aurora" }]')
|
||||
fi
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Generate matrix for TPC-H benchmarks
|
||||
id: tpch-compare-matrix
|
||||
run: |
|
||||
matrix='{
|
||||
"platform": [
|
||||
"neon-captest-reuse"
|
||||
],
|
||||
"scale": [
|
||||
"10"
|
||||
]
|
||||
"db_size": [ "1tb" ]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "scale": "10" },
|
||||
{ "platform": "rds-aurora", "scale": "10" }]')
|
||||
fi
|
||||
echo "matrix=$(echo $matrix | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
|
||||
pgbench-compare:
|
||||
pgbench-compare-big-db:
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices ]
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix: ${{fromJson(needs.generate-matrices.outputs.pgbench-compare-matrix)}}
|
||||
matrix: ${{fromJson(needs.generate-matrices.outputs.pgbench-compare-big-db-matrix)}}
|
||||
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
|
||||
@@ -217,9 +106,6 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
|
||||
options: --init
|
||||
|
||||
# Increase timeout to 8h, default timeout is 6h
|
||||
timeout-minutes: 480
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
@@ -235,32 +121,15 @@ jobs:
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Create Neon Project
|
||||
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier", "neonvm-captest-new", "neonvm-captest-freetier"]'), matrix.platform)
|
||||
id: create-neon-project
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
|
||||
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
compute_units: ${{ (matrix.platform == 'neon-captest-freetier' && '[0.25, 0.25]') || '[1, 1]' }}
|
||||
provisioner: ${{ (contains(matrix.platform, 'neonvm-') && 'k8s-neonvm') || 'k8s-pod' }}
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_PGBENCH_1TB_CONNSTR }}
|
||||
;;
|
||||
neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier)
|
||||
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
|
||||
;;
|
||||
rds-aurora)
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CONNSTR }}
|
||||
;;
|
||||
rds-postgres)
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
|
||||
neonvm-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_NEONVM_PGBENCH_1TB_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}"
|
||||
@@ -270,24 +139,7 @@ jobs:
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
- name: Benchmark init
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
psql ${CONNSTR} -c "SELECT version();"
|
||||
|
||||
- name: Benchmark simple-update
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
@@ -315,13 +167,6 @@ jobs:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
|
||||
- name: Delete Neon Project
|
||||
if: ${{ steps.create-neon-project.outputs.project_id && always() }}
|
||||
uses: ./.github/actions/neon-project-delete
|
||||
with:
|
||||
project_id: ${{ steps.create-neon-project.outputs.project_id }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
@@ -331,300 +176,6 @@ jobs:
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
clickbench-compare:
|
||||
# ClichBench DB for rds-aurora and rds-Postgres deployed to the same clusters
|
||||
# we use for performance testing in pgbench-compare.
|
||||
# Run this job only when pgbench-compare is finished to avoid the intersection.
|
||||
# We might change it after https://github.com/neondatabase/neon/issues/2900.
|
||||
#
|
||||
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
|
||||
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, pgbench-compare ]
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
|
||||
|
||||
env:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
TEST_OLAP_COLLECT_EXPLAIN: ${{ github.event.inputs.collect_olap_explain }}
|
||||
TEST_OLAP_COLLECT_PG_STAT_STATEMENTS: ${{ github.event.inputs.collect_pg_stat_statements }}
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Add Postgres binaries to PATH
|
||||
run: |
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
|
||||
;;
|
||||
rds-aurora)
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CLICKBENCH_10M_CONNSTR }}
|
||||
;;
|
||||
rds-postgres)
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
- name: ClickBench benchmark
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance/test_perf_olap.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_clickbench
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_OLAP_COLLECT_EXPLAIN: ${{ github.event.inputs.collect_olap_explain || 'false' }}
|
||||
TEST_OLAP_COLLECT_PG_STAT_STATEMENTS: ${{ github.event.inputs.collect_pg_stat_statements || 'false' }}
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
TEST_OLAP_SCALE: 10
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic OLAP perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
tpch-compare:
|
||||
# TCP-H DB for rds-aurora and rds-Postgres deployed to the same clusters
|
||||
# we use for performance testing in pgbench-compare & clickbench-compare.
|
||||
# Run this job only when clickbench-compare is finished to avoid the intersection.
|
||||
# We might change it after https://github.com/neondatabase/neon/issues/2900.
|
||||
#
|
||||
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, clickbench-compare ]
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix: ${{ fromJson(needs.generate-matrices.outputs.tpch-compare-matrix) }}
|
||||
|
||||
env:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
TEST_OLAP_SCALE: ${{ matrix.scale }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Add Postgres binaries to PATH
|
||||
run: |
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Get Connstring Secret Name
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neon-captest-reuse)
|
||||
ENV_PLATFORM=CAPTEST_TPCH
|
||||
;;
|
||||
rds-aurora)
|
||||
ENV_PLATFORM=RDS_AURORA_TPCH
|
||||
;;
|
||||
rds-postgres)
|
||||
ENV_PLATFORM=RDS_AURORA_TPCH
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
CONNSTR_SECRET_NAME="BENCHMARK_${ENV_PLATFORM}_S${TEST_OLAP_SCALE}_CONNSTR"
|
||||
echo "CONNSTR_SECRET_NAME=${CONNSTR_SECRET_NAME}" >> $GITHUB_ENV
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
CONNSTR=${{ secrets[env.CONNSTR_SECRET_NAME] }}
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
- name: Run TPC-H benchmark
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance/test_perf_olap.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_tpch
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
TEST_OLAP_SCALE: ${{ matrix.scale }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic TPC-H perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
user-examples-compare:
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, tpch-compare ]
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
|
||||
|
||||
env:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Add Postgres binaries to PATH
|
||||
run: |
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
|
||||
;;
|
||||
rds-aurora)
|
||||
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_AURORA_CONNSTR }}
|
||||
;;
|
||||
rds-postgres)
|
||||
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
- name: Run user examples
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance/test_perf_olap.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_user_examples
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic User example perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
slack-message: "Periodic perf testing ${{ matrix.platform }} (${{ matrix.db_size }}): ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -1132,11 +1132,9 @@ jobs:
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
|
||||
gh workflow --repo neondatabase/aws run deploy-proxy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
-f deployStorage=false \
|
||||
-f deployStorageBroker=false \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
else
|
||||
|
||||
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -97,7 +97,7 @@ jobs:
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
gh pr create --title "Proxy release ${RELEASE_DATE}}" \
|
||||
gh pr create --title "Proxy release ${RELEASE_DATE}" \
|
||||
--body-file "body.md" \
|
||||
--head "${RELEASE_BRANCH}" \
|
||||
--base "release-proxy"
|
||||
|
||||
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -25,9 +25,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.5"
|
||||
version = "0.8.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d"
|
||||
checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"const-random",
|
||||
@@ -1389,9 +1389,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "crc32c"
|
||||
version = "0.6.3"
|
||||
version = "0.6.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e"
|
||||
checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2"
|
||||
dependencies = [
|
||||
"rustc_version",
|
||||
]
|
||||
|
||||
@@ -230,6 +230,8 @@ postgres=# select * from t;
|
||||
> cargo neon stop
|
||||
```
|
||||
|
||||
More advanced usages can be found at [Control Plane and Neon Local](./control_plane/README.md).
|
||||
|
||||
#### Handling build failures
|
||||
|
||||
If you encounter errors during setting up the initial tenant, it's best to stop everything (`cargo neon stop`) and remove the `.neon` directory. Then fix the problems, and start the setup again.
|
||||
|
||||
@@ -18,8 +18,6 @@ use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio;
|
||||
use tokio_postgres;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -71,7 +71,7 @@ More specifically, here is an example ext_index.json
|
||||
}
|
||||
}
|
||||
*/
|
||||
use anyhow::{self, Result};
|
||||
use anyhow::Result;
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
|
||||
@@ -13,8 +13,6 @@ use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIErr
|
||||
use anyhow::Result;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use num_cpus;
|
||||
use serde_json;
|
||||
use tokio::task;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
|
||||
26
control_plane/README.md
Normal file
26
control_plane/README.md
Normal file
@@ -0,0 +1,26 @@
|
||||
# Control Plane and Neon Local
|
||||
|
||||
This crate contains tools to start a Neon development environment locally. This utility can be used with the `cargo neon` command.
|
||||
|
||||
## Example: Start with Postgres 16
|
||||
|
||||
To create and start a local development environment with Postgres 16, you will need to provide `--pg-version` flag to 3 of the start-up commands.
|
||||
|
||||
```shell
|
||||
cargo neon init --pg-version 16
|
||||
cargo neon start
|
||||
cargo neon tenant create --set-default --pg-version 16
|
||||
cargo neon endpoint create main --pg-version 16
|
||||
cargo neon endpoint start main
|
||||
```
|
||||
|
||||
## Example: Create Test User and Database
|
||||
|
||||
By default, `cargo neon` starts an endpoint with `cloud_admin` and `postgres` database. If you want to have a role and a database similar to what we have on the cloud service, you can do it with the following commands when starting an endpoint.
|
||||
|
||||
```shell
|
||||
cargo neon endpoint create main --pg-version 16 --update-catalog true
|
||||
cargo neon endpoint start main --create-test-user true
|
||||
```
|
||||
|
||||
The first command creates `neon_superuser` and necessary roles. The second command creates `test` user and `neondb` database. You will see a connection string that connects you to the test user after running the second command.
|
||||
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE tenant_shards ALTER generation SET NOT NULL;
|
||||
ALTER TABLE tenant_shards ALTER generation_pageserver SET NOT NULL;
|
||||
@@ -0,0 +1,4 @@
|
||||
|
||||
|
||||
ALTER TABLE tenant_shards ALTER generation DROP NOT NULL;
|
||||
ALTER TABLE tenant_shards ALTER generation_pageserver DROP NOT NULL;
|
||||
@@ -1,9 +1,10 @@
|
||||
use crate::reconciler::ReconcileError;
|
||||
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
|
||||
use crate::PlacementPolicy;
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{StatusCode, Uri};
|
||||
use pageserver_api::models::{
|
||||
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TenantConfigRequest, TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TenantTimeTravelRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -117,9 +118,14 @@ async fn handle_tenant_create(
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
|
||||
|
||||
// TODO: enable specifying this. Using Single as a default helps legacy tests to work (they
|
||||
// have no expectation of HA).
|
||||
let placement_policy = PlacementPolicy::Single;
|
||||
|
||||
json_response(
|
||||
StatusCode::CREATED,
|
||||
service.tenant_create(create_req).await?,
|
||||
service.tenant_create(create_req, placement_policy).await?,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -185,6 +191,27 @@ async fn handle_tenant_location_config(
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_config_set(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
|
||||
|
||||
json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
|
||||
}
|
||||
|
||||
async fn handle_tenant_config_get(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
|
||||
}
|
||||
|
||||
async fn handle_tenant_time_travel_remote_storage(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -216,7 +243,15 @@ async fn handle_tenant_time_travel_remote_storage(
|
||||
done_if_after_raw,
|
||||
)
|
||||
.await?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_secondary_download(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
service.tenant_secondary_download(tenant_id).await?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
@@ -551,12 +586,21 @@ pub fn make_router(
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(r, handle_tenant_delete)
|
||||
})
|
||||
.put("/v1/tenant/config", |r| {
|
||||
tenant_service_handler(r, handle_tenant_config_set)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/config", |r| {
|
||||
tenant_service_handler(r, handle_tenant_config_get)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_id/location_config", |r| {
|
||||
tenant_service_handler(r, handle_tenant_location_config)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
|
||||
tenant_service_handler(r, handle_tenant_time_travel_remote_storage)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/secondary/download", |r| {
|
||||
tenant_service_handler(r, handle_tenant_secondary_download)
|
||||
})
|
||||
// Timeline operations
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
tenant_service_handler(r, handle_tenant_timeline_delete)
|
||||
|
||||
@@ -13,14 +13,20 @@ mod schema;
|
||||
pub mod service;
|
||||
mod tenant_state;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
enum PlacementPolicy {
|
||||
/// Cheapest way to attach a tenant: just one pageserver, no secondary
|
||||
Single,
|
||||
/// Production-ready way to attach a tenant: one attached pageserver and
|
||||
/// some number of secondaries.
|
||||
Double(usize),
|
||||
/// Do not attach to any pageservers
|
||||
/// Create one secondary mode locations. This is useful when onboarding
|
||||
/// a tenant, or for an idle tenant that we might want to bring online quickly.
|
||||
Secondary,
|
||||
|
||||
/// Do not attach to any pageservers. This is appropriate for tenants that
|
||||
/// have been idle for a long time, where we do not mind some delay in making
|
||||
/// them available in future.
|
||||
Detached,
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use attachment_service::http::make_router;
|
||||
use attachment_service::metrics::preinitialize_metrics;
|
||||
use attachment_service::persistence::Persistence;
|
||||
use attachment_service::service::{Config, Service};
|
||||
use aws_config::{self, BehaviorVersion, Region};
|
||||
use aws_config::{BehaviorVersion, Region};
|
||||
use camino::Utf8PathBuf;
|
||||
use clap::Parser;
|
||||
use diesel::Connection;
|
||||
@@ -79,13 +79,38 @@ impl Secrets {
|
||||
"neon-storage-controller-control-plane-jwt-token";
|
||||
const PUBLIC_KEY_SECRET: &'static str = "neon-storage-controller-public-key";
|
||||
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
|
||||
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
|
||||
|
||||
/// Load secrets from, in order of preference:
|
||||
/// - CLI args if database URL is provided on the CLI
|
||||
/// - Environment variables if DATABASE_URL is set.
|
||||
/// - AWS Secrets Manager secrets
|
||||
async fn load(args: &Cli) -> anyhow::Result<Self> {
|
||||
match &args.database_url {
|
||||
Some(url) => Self::load_cli(url, args),
|
||||
None => Self::load_aws_sm().await,
|
||||
None => match std::env::var(Self::DATABASE_URL_ENV) {
|
||||
Ok(database_url) => Self::load_env(database_url),
|
||||
Err(_) => Self::load_aws_sm().await,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn load_env(database_url: String) -> anyhow::Result<Self> {
|
||||
let public_key = match std::env::var(Self::PUBLIC_KEY_ENV) {
|
||||
Ok(public_key) => Some(JwtAuth::from_key(public_key).context("Loading public key")?),
|
||||
Err(_) => None,
|
||||
};
|
||||
Ok(Self {
|
||||
database_url,
|
||||
public_key,
|
||||
jwt_token: std::env::var(Self::PAGESERVER_JWT_TOKEN_ENV).ok(),
|
||||
control_plane_jwt_token: std::env::var(Self::CONTROL_PLANE_JWT_TOKEN_ENV).ok(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn load_aws_sm() -> anyhow::Result<Self> {
|
||||
let Ok(region) = std::env::var("AWS_REGION") else {
|
||||
anyhow::bail!("AWS_REGION is not set, cannot load secrets automatically: either set this, or use CLI args to supply secrets");
|
||||
|
||||
@@ -7,8 +7,10 @@ use self::split_state::SplitState;
|
||||
use camino::Utf8Path;
|
||||
use camino::Utf8PathBuf;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use diesel::{
|
||||
Connection, ExpressionMethods, Insertable, QueryDsl, QueryResult, Queryable, RunQueryDsl,
|
||||
Selectable, SelectableHelper,
|
||||
};
|
||||
use pageserver_api::controller_api::NodeSchedulingPolicy;
|
||||
use pageserver_api::models::TenantConfig;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
@@ -331,7 +333,15 @@ impl Persistence {
|
||||
shard_number: ShardNumber(tsp.shard_number as u8),
|
||||
shard_count: ShardCount::new(tsp.shard_count as u8),
|
||||
};
|
||||
result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
|
||||
|
||||
let Some(g) = tsp.generation else {
|
||||
// If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
|
||||
// we only set generation_pageserver when setting generation.
|
||||
return Err(DatabaseError::Logical(
|
||||
"Generation should always be set after incrementing".to_string(),
|
||||
));
|
||||
};
|
||||
result.insert(tenant_shard_id, Generation::new(g as u32));
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
@@ -364,7 +374,85 @@ impl Persistence {
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Generation::new(updated.generation as u32))
|
||||
// Generation is always non-null in the rseult: if the generation column had been NULL, then we
|
||||
// should have experienced an SQL Confilict error while executing a query that tries to increment it.
|
||||
debug_assert!(updated.generation.is_some());
|
||||
let Some(g) = updated.generation else {
|
||||
return Err(DatabaseError::Logical(
|
||||
"Generation should always be set after incrementing".to_string(),
|
||||
)
|
||||
.into());
|
||||
};
|
||||
|
||||
Ok(Generation::new(g as u32))
|
||||
}
|
||||
|
||||
/// For use when updating a persistent property of a tenant, such as its config or placement_policy.
|
||||
///
|
||||
/// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
|
||||
/// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
|
||||
/// that we only do the first time a tenant is set to an attached policy via /location_config.
|
||||
pub(crate) async fn update_tenant_shard(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
input_placement_policy: PlacementPolicy,
|
||||
input_config: TenantConfig,
|
||||
input_generation: Option<Generation>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
|
||||
self.with_conn(move |conn| {
|
||||
let query = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32));
|
||||
|
||||
if let Some(input_generation) = input_generation {
|
||||
// Update includes generation column
|
||||
query
|
||||
.set((
|
||||
generation.eq(Some(input_generation.into().unwrap() as i32)),
|
||||
placement_policy
|
||||
.eq(serde_json::to_string(&input_placement_policy).unwrap()),
|
||||
config.eq(serde_json::to_string(&input_config).unwrap()),
|
||||
))
|
||||
.execute(conn)?;
|
||||
} else {
|
||||
// Update does not include generation column
|
||||
query
|
||||
.set((
|
||||
placement_policy
|
||||
.eq(serde_json::to_string(&input_placement_policy).unwrap()),
|
||||
config.eq(serde_json::to_string(&input_config).unwrap()),
|
||||
))
|
||||
.execute(conn)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn update_tenant_config(
|
||||
&self,
|
||||
input_tenant_id: TenantId,
|
||||
input_config: TenantConfig,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
|
||||
self.with_conn(move |conn| {
|
||||
diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(input_tenant_id.to_string()))
|
||||
.set((config.eq(serde_json::to_string(&input_config).unwrap()),))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
|
||||
@@ -375,7 +463,7 @@ impl Persistence {
|
||||
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
|
||||
.set((
|
||||
generation_pageserver.eq(i64::MAX),
|
||||
generation_pageserver.eq(Option::<i64>::None),
|
||||
placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
|
||||
))
|
||||
.execute(conn)?;
|
||||
@@ -501,12 +589,15 @@ pub(crate) struct TenantShardPersistence {
|
||||
pub(crate) shard_stripe_size: i32,
|
||||
|
||||
// Latest generation number: next time we attach, increment this
|
||||
// and use the incremented number when attaching
|
||||
pub(crate) generation: i32,
|
||||
// and use the incremented number when attaching.
|
||||
//
|
||||
// Generation is only None when first onboarding a tenant, where it may
|
||||
// be in PlacementPolicy::Secondary and therefore have no valid generation state.
|
||||
pub(crate) generation: Option<i32>,
|
||||
|
||||
// Currently attached pageserver
|
||||
#[serde(rename = "pageserver")]
|
||||
pub(crate) generation_pageserver: i64,
|
||||
pub(crate) generation_pageserver: Option<i64>,
|
||||
|
||||
#[serde(default)]
|
||||
pub(crate) placement_policy: String,
|
||||
|
||||
@@ -26,7 +26,7 @@ pub(super) struct Reconciler {
|
||||
/// of a tenant's state from when we spawned a reconcile task.
|
||||
pub(super) tenant_shard_id: TenantShardId,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) generation: Option<Generation>,
|
||||
pub(crate) intent: TargetState,
|
||||
pub(crate) config: TenantConfig,
|
||||
pub(crate) observed: ObservedState,
|
||||
@@ -312,7 +312,7 @@ impl Reconciler {
|
||||
&self.shard,
|
||||
&self.config,
|
||||
LocationConfigMode::AttachedStale,
|
||||
Some(self.generation),
|
||||
self.generation,
|
||||
None,
|
||||
);
|
||||
self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10)))
|
||||
@@ -335,16 +335,17 @@ impl Reconciler {
|
||||
}
|
||||
|
||||
// Increment generation before attaching to new pageserver
|
||||
self.generation = self
|
||||
.persistence
|
||||
.increment_generation(self.tenant_shard_id, dest_ps_id)
|
||||
.await?;
|
||||
self.generation = Some(
|
||||
self.persistence
|
||||
.increment_generation(self.tenant_shard_id, dest_ps_id)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let dest_conf = build_location_config(
|
||||
&self.shard,
|
||||
&self.config,
|
||||
LocationConfigMode::AttachedMulti,
|
||||
Some(self.generation),
|
||||
self.generation,
|
||||
None,
|
||||
);
|
||||
|
||||
@@ -401,7 +402,7 @@ impl Reconciler {
|
||||
&self.shard,
|
||||
&self.config,
|
||||
LocationConfigMode::AttachedSingle,
|
||||
Some(self.generation),
|
||||
self.generation,
|
||||
None,
|
||||
);
|
||||
self.location_config(dest_ps_id, dest_final_conf.clone(), None)
|
||||
@@ -433,22 +434,62 @@ impl Reconciler {
|
||||
|
||||
// If the attached pageserver is not attached, do so now.
|
||||
if let Some(node_id) = self.intent.attached {
|
||||
let mut wanted_conf =
|
||||
attached_location_conf(self.generation, &self.shard, &self.config);
|
||||
// If we are in an attached policy, then generation must have been set (null generations
|
||||
// are only present when a tenant is initially loaded with a secondary policy)
|
||||
debug_assert!(self.generation.is_some());
|
||||
let Some(generation) = self.generation else {
|
||||
return Err(ReconcileError::Other(anyhow::anyhow!(
|
||||
"Attempted to attach with NULL generation"
|
||||
)));
|
||||
};
|
||||
|
||||
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
tracing::info!(%node_id, "Observed configuration already correct.")
|
||||
}
|
||||
_ => {
|
||||
observed => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
// reconcile this location. This includes locations with different configurations, as well
|
||||
// as locations with unknown (None) observed state.
|
||||
self.generation = self
|
||||
.persistence
|
||||
.increment_generation(self.tenant_shard_id, node_id)
|
||||
.await?;
|
||||
wanted_conf.generation = self.generation.into();
|
||||
|
||||
// The general case is to increment the generation. However, there are cases
|
||||
// where this is not necessary:
|
||||
// - if we are only updating the TenantConf part of the location
|
||||
// - if we are only changing the attachment mode (e.g. going to attachedmulti or attachedstale)
|
||||
// and the location was already in the correct generation
|
||||
let increment_generation = match observed {
|
||||
None => true,
|
||||
Some(ObservedStateLocation { conf: None }) => true,
|
||||
Some(ObservedStateLocation {
|
||||
conf: Some(observed),
|
||||
}) => {
|
||||
let generations_match = observed.generation == wanted_conf.generation;
|
||||
|
||||
use LocationConfigMode::*;
|
||||
let mode_transition_requires_gen_inc =
|
||||
match (observed.mode, wanted_conf.mode) {
|
||||
// Usually the short-lived attachment modes (multi and stale) are only used
|
||||
// in the case of [`Self::live_migrate`], but it is simple to handle them correctly
|
||||
// here too. Locations are allowed to go Single->Stale and Multi->Single within the same generation.
|
||||
(AttachedSingle, AttachedStale) => false,
|
||||
(AttachedMulti, AttachedSingle) => false,
|
||||
(lhs, rhs) => lhs != rhs,
|
||||
};
|
||||
|
||||
!generations_match || mode_transition_requires_gen_inc
|
||||
}
|
||||
};
|
||||
|
||||
if increment_generation {
|
||||
let generation = self
|
||||
.persistence
|
||||
.increment_generation(self.tenant_shard_id, node_id)
|
||||
.await?;
|
||||
self.generation = Some(generation);
|
||||
wanted_conf.generation = generation.into();
|
||||
}
|
||||
tracing::info!(%node_id, "Observed configuration requires update.");
|
||||
self.location_config(node_id, wanted_conf, None).await?;
|
||||
self.compute_notify().await?;
|
||||
|
||||
@@ -284,7 +284,6 @@ pub(crate) mod test_utils {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::tenant_state::IntentState;
|
||||
#[test]
|
||||
|
||||
@@ -17,8 +17,8 @@ diesel::table! {
|
||||
shard_number -> Int4,
|
||||
shard_count -> Int4,
|
||||
shard_stripe_size -> Int4,
|
||||
generation -> Int4,
|
||||
generation_pageserver -> Int8,
|
||||
generation -> Nullable<Int4>,
|
||||
generation_pageserver -> Nullable<Int8>,
|
||||
placement_policy -> Varchar,
|
||||
splitting -> Int2,
|
||||
config -> Text,
|
||||
|
||||
@@ -14,10 +14,13 @@ use control_plane::attachment_service::{
|
||||
use diesel::result::DatabaseErrorKind;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::controller_api::{
|
||||
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
|
||||
TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
|
||||
TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
},
|
||||
models::TenantConfigRequest,
|
||||
};
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
@@ -65,6 +68,11 @@ const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
// some data in it.
|
||||
const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
// If we receive a call using Secondary mode initially, it will omit generation. We will initialize
|
||||
// tenant shards into this generation, and as long as it remains in this generation, we will accept
|
||||
// input generation from future requests as authoritative.
|
||||
const INITIAL_GENERATION: Generation = Generation::new(0);
|
||||
|
||||
/// How long [`Service::startup_reconcile`] is allowed to take before it should give
|
||||
/// up on unresponsive pageservers and proceed.
|
||||
pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
@@ -167,6 +175,21 @@ impl From<ReconcileWaitError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum TenantCreateOrUpdate {
|
||||
Create((TenantCreateRequest, PlacementPolicy)),
|
||||
Update(Vec<ShardUpdate>),
|
||||
}
|
||||
|
||||
struct ShardUpdate {
|
||||
tenant_shard_id: TenantShardId,
|
||||
placement_policy: PlacementPolicy,
|
||||
tenant_config: TenantConfig,
|
||||
|
||||
/// If this is None, generation is not updated.
|
||||
generation: Option<Generation>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -571,6 +594,9 @@ impl Service {
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
// Let the TenantState know it is idle.
|
||||
tenant.reconcile_complete(result.sequence);
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
@@ -661,8 +687,8 @@ impl Service {
|
||||
// after when pageservers start up and register.
|
||||
let mut node_ids = HashSet::new();
|
||||
for tsp in &tenant_shard_persistence {
|
||||
if tsp.generation_pageserver != i64::MAX {
|
||||
node_ids.insert(tsp.generation_pageserver);
|
||||
if let Some(node_id) = tsp.generation_pageserver {
|
||||
node_ids.insert(node_id);
|
||||
}
|
||||
}
|
||||
for node_id in node_ids {
|
||||
@@ -699,18 +725,15 @@ impl Service {
|
||||
// We will populate intent properly later in [`Self::startup_reconcile`], initially populate
|
||||
// it with what we can infer: the node for which a generation was most recently issued.
|
||||
let mut intent = IntentState::new();
|
||||
if tsp.generation_pageserver != i64::MAX {
|
||||
intent.set_attached(
|
||||
&mut scheduler,
|
||||
Some(NodeId(tsp.generation_pageserver as u64)),
|
||||
);
|
||||
if let Some(generation_pageserver) = tsp.generation_pageserver {
|
||||
intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64)));
|
||||
}
|
||||
|
||||
let new_tenant = TenantState {
|
||||
tenant_shard_id,
|
||||
shard: shard_identity,
|
||||
sequence: Sequence::initial(),
|
||||
generation: Generation::new(tsp.generation as u32),
|
||||
generation: tsp.generation.map(|g| Generation::new(g as u32)),
|
||||
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
|
||||
intent,
|
||||
observed: ObservedState::new(),
|
||||
@@ -790,8 +813,8 @@ impl Service {
|
||||
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: attach_req.tenant_shard_id.shard_count.literal() as i32,
|
||||
shard_stripe_size: 0,
|
||||
generation: 0,
|
||||
generation_pageserver: i64::MAX,
|
||||
generation: Some(0),
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
@@ -846,7 +869,7 @@ impl Service {
|
||||
.expect("Checked for existence above");
|
||||
|
||||
if let Some(new_generation) = new_generation {
|
||||
tenant_state.generation = new_generation;
|
||||
tenant_state.generation = Some(new_generation);
|
||||
} else {
|
||||
// This is a detach notification. We must update placement policy to avoid re-attaching
|
||||
// during background scheduling/reconciliation, or during attachment service restart.
|
||||
@@ -896,7 +919,7 @@ impl Service {
|
||||
node_id,
|
||||
ObservedStateLocation {
|
||||
conf: Some(attached_location_conf(
|
||||
tenant_state.generation,
|
||||
tenant_state.generation.unwrap(),
|
||||
&tenant_state.shard,
|
||||
&tenant_state.config,
|
||||
)),
|
||||
@@ -910,7 +933,7 @@ impl Service {
|
||||
Ok(AttachHookResponse {
|
||||
gen: attach_req
|
||||
.node_id
|
||||
.map(|_| tenant_state.generation.into().unwrap()),
|
||||
.map(|_| tenant_state.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -923,7 +946,7 @@ impl Service {
|
||||
attachment: tenant_state.and_then(|s| {
|
||||
s.intent
|
||||
.get_attached()
|
||||
.map(|ps| (s.generation.into().unwrap(), ps))
|
||||
.map(|ps| (s.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap(), ps))
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -973,7 +996,17 @@ impl Service {
|
||||
continue;
|
||||
};
|
||||
|
||||
shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
|
||||
// If [`Persistence::re_attach`] selected this shard, it must have alread
|
||||
// had a generation set.
|
||||
debug_assert!(shard_state.generation.is_some());
|
||||
let Some(old_gen) = shard_state.generation else {
|
||||
// Should never happen: would only return incremented generation
|
||||
// for a tenant that already had a non-null generation.
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Generation must be set while re-attaching"
|
||||
)));
|
||||
};
|
||||
shard_state.generation = Some(std::cmp::max(old_gen, new_gen));
|
||||
if let Some(observed) = shard_state
|
||||
.observed
|
||||
.locations
|
||||
@@ -1003,7 +1036,7 @@ impl Service {
|
||||
|
||||
for req_tenant in validate_req.tenants {
|
||||
if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
|
||||
let valid = tenant_state.generation == Generation::new(req_tenant.gen);
|
||||
let valid = tenant_state.generation == Some(Generation::new(req_tenant.gen));
|
||||
tracing::info!(
|
||||
"handle_validate: {}(gen {}): valid={valid} (latest {:?})",
|
||||
req_tenant.id,
|
||||
@@ -1030,8 +1063,9 @@ impl Service {
|
||||
pub(crate) async fn tenant_create(
|
||||
&self,
|
||||
create_req: TenantCreateRequest,
|
||||
placement_policy: PlacementPolicy,
|
||||
) -> Result<TenantCreateResponse, ApiError> {
|
||||
let (response, waiters) = self.do_tenant_create(create_req).await?;
|
||||
let (response, waiters) = self.do_tenant_create(create_req, placement_policy).await?;
|
||||
|
||||
self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
|
||||
Ok(response)
|
||||
@@ -1040,6 +1074,7 @@ impl Service {
|
||||
pub(crate) async fn do_tenant_create(
|
||||
&self,
|
||||
create_req: TenantCreateRequest,
|
||||
placement_policy: PlacementPolicy,
|
||||
) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
|
||||
// This service expects to handle sharding itself: it is an error to try and directly create
|
||||
// a particular shard here.
|
||||
@@ -1065,9 +1100,27 @@ impl Service {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// TODO: enable specifying this. Using Single as a default helps legacy tests to work (they
|
||||
// have no expectation of HA).
|
||||
let placement_policy: PlacementPolicy = PlacementPolicy::Single;
|
||||
// If the caller specifies a None generation, it means "start from default". This is different
|
||||
// to [`Self::tenant_location_config`], where a None generation is used to represent
|
||||
// an incompletely-onboarded tenant.
|
||||
let initial_generation = if matches!(placement_policy, PlacementPolicy::Secondary) {
|
||||
tracing::info!(
|
||||
"tenant_create: secondary mode, generation is_some={}",
|
||||
create_req.generation.is_some()
|
||||
);
|
||||
create_req.generation.map(Generation::new)
|
||||
} else {
|
||||
tracing::info!(
|
||||
"tenant_create: not secondary mode, generation is_some={}",
|
||||
create_req.generation.is_some()
|
||||
);
|
||||
Some(
|
||||
create_req
|
||||
.generation
|
||||
.map(Generation::new)
|
||||
.unwrap_or(INITIAL_GENERATION),
|
||||
)
|
||||
};
|
||||
|
||||
// Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
|
||||
// to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
|
||||
@@ -1079,8 +1132,10 @@ impl Service {
|
||||
shard_number: tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: tenant_shard_id.shard_count.literal() as i32,
|
||||
shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
|
||||
generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
|
||||
generation_pageserver: i64::MAX,
|
||||
generation: initial_generation.map(|g| g.into().unwrap() as i32),
|
||||
// The pageserver is not known until scheduling happens: we will set this column when
|
||||
// incrementing the generation the first time we attach to a pageserver.
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
|
||||
config: serde_json::to_string(&create_req.config).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
@@ -1120,15 +1175,17 @@ impl Service {
|
||||
))
|
||||
})?;
|
||||
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: entry
|
||||
if let Some(node_id) = entry.get().intent.get_attached() {
|
||||
let generation = entry
|
||||
.get()
|
||||
.intent
|
||||
.get_attached()
|
||||
.expect("We just set pageserver if it was None"),
|
||||
generation: entry.get().generation.into().unwrap(),
|
||||
});
|
||||
.generation
|
||||
.expect("Generation is set when in attached mode");
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: *node_id,
|
||||
generation: generation.into().unwrap(),
|
||||
});
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
@@ -1142,9 +1199,7 @@ impl Service {
|
||||
placement_policy.clone(),
|
||||
);
|
||||
|
||||
if let Some(create_gen) = create_req.generation {
|
||||
state.generation = Generation::new(create_gen);
|
||||
}
|
||||
state.generation = initial_generation;
|
||||
state.config = create_req.config.clone();
|
||||
|
||||
state.schedule(scheduler).map_err(|e| {
|
||||
@@ -1153,14 +1208,18 @@ impl Service {
|
||||
))
|
||||
})?;
|
||||
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: state
|
||||
.intent
|
||||
.get_attached()
|
||||
.expect("We just set pageserver if it was None"),
|
||||
generation: state.generation.into().unwrap(),
|
||||
});
|
||||
// Only include shards in result if we are attaching: the purpose
|
||||
// of the response is to tell the caller where the shards are attached.
|
||||
if let Some(node_id) = state.intent.get_attached() {
|
||||
let generation = state
|
||||
.generation
|
||||
.expect("Generation is set when in attached mode");
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: *node_id,
|
||||
generation: generation.into().unwrap(),
|
||||
});
|
||||
}
|
||||
entry.insert(state)
|
||||
}
|
||||
};
|
||||
@@ -1214,12 +1273,114 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This API is used by the cloud control plane to do coarse-grained control of tenants:
|
||||
/// - Call with mode Attached* to upsert the tenant.
|
||||
/// - Call with mode Detached to switch to PolicyMode::Detached
|
||||
/// Part of [`Self::tenant_location_config`]: dissect an incoming location config request,
|
||||
/// and transform it into either a tenant creation of a series of shard updates.
|
||||
fn tenant_location_config_prepare(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
req: TenantLocationConfigRequest,
|
||||
) -> TenantCreateOrUpdate {
|
||||
let mut updates = Vec::new();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
|
||||
// Use location config mode as an indicator of policy.
|
||||
let placement_policy = match req.config.mode {
|
||||
LocationConfigMode::Detached => PlacementPolicy::Detached,
|
||||
LocationConfigMode::Secondary => PlacementPolicy::Secondary,
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
if nodes.len() > 1 {
|
||||
PlacementPolicy::Double(1)
|
||||
} else {
|
||||
// Convenience for dev/test: if we just have one pageserver, import
|
||||
// tenants into Single mode so that scheduling will succeed.
|
||||
PlacementPolicy::Single
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let mut create = true;
|
||||
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
// Saw an existing shard: this is not a creation
|
||||
create = false;
|
||||
|
||||
// Shards may have initially been created by a Secondary request, where we
|
||||
// would have left generation as None.
|
||||
//
|
||||
// We only update generation the first time we see an attached-mode request,
|
||||
// and if there is no existing generation set. The caller is responsible for
|
||||
// ensuring that no non-storage-controller pageserver ever uses a higher
|
||||
// generation than they passed in here.
|
||||
use LocationConfigMode::*;
|
||||
let set_generation = match req.config.mode {
|
||||
AttachedMulti | AttachedSingle | AttachedStale if shard.generation.is_none() => {
|
||||
req.config.generation.map(Generation::new)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if shard.policy != placement_policy
|
||||
|| shard.config != req.config.tenant_conf
|
||||
|| set_generation.is_some()
|
||||
{
|
||||
updates.push(ShardUpdate {
|
||||
tenant_shard_id: *shard_id,
|
||||
placement_policy: placement_policy.clone(),
|
||||
tenant_config: req.config.tenant_conf.clone(),
|
||||
generation: set_generation,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if create {
|
||||
use LocationConfigMode::*;
|
||||
let generation = match req.config.mode {
|
||||
AttachedMulti | AttachedSingle | AttachedStale => req.config.generation,
|
||||
// If a caller provided a generation in a non-attached request, ignore it
|
||||
// and leave our generation as None: this enables a subsequent update to set
|
||||
// the generation when setting an attached mode for the first time.
|
||||
_ => None,
|
||||
};
|
||||
|
||||
TenantCreateOrUpdate::Create(
|
||||
// Synthesize a creation request
|
||||
(
|
||||
TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation,
|
||||
shard_parameters: ShardParameters {
|
||||
// Must preserve the incoming shard_count do distinguish unsharded (0)
|
||||
// from single-sharded (1): this distinction appears in the S3 keys of the tenant.
|
||||
count: req.tenant_id.shard_count,
|
||||
// We only import un-sharded or single-sharded tenants, so stripe
|
||||
// size can be made up arbitrarily here.
|
||||
stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
},
|
||||
config: req.config.tenant_conf,
|
||||
},
|
||||
placement_policy,
|
||||
),
|
||||
)
|
||||
} else {
|
||||
TenantCreateOrUpdate::Update(updates)
|
||||
}
|
||||
}
|
||||
|
||||
/// This API is used by the cloud control plane to migrate unsharded tenants that it created
|
||||
/// directly with pageservers into this service.
|
||||
///
|
||||
/// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
|
||||
/// secondary locations.
|
||||
/// Cloud control plane MUST NOT continue issuing GENERATION NUMBERS for this tenant once it
|
||||
/// has attempted to call this API. Failure to oblige to this rule may lead to S3 corruption.
|
||||
/// Think of the first attempt to call this API as a transfer of absolute authority over the
|
||||
/// tenant's source of generation numbers.
|
||||
///
|
||||
/// The mode in this request coarse-grained control of tenants:
|
||||
/// - Call with mode Attached* to upsert the tenant.
|
||||
/// - Call with mode Secondary to either onboard a tenant without attaching it, or
|
||||
/// to set an existing tenant to PolicyMode::Secondary
|
||||
/// - Call with mode Detached to switch to PolicyMode::Detached
|
||||
pub(crate) async fn tenant_location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -1231,131 +1392,96 @@ impl Service {
|
||||
)));
|
||||
}
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
// First check if this is a creation or an update
|
||||
let create_or_update = self.tenant_location_config_prepare(tenant_id, req);
|
||||
|
||||
let mut result = TenantLocationConfigResponse { shards: Vec::new() };
|
||||
let maybe_create = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
let waiters = match create_or_update {
|
||||
TenantCreateOrUpdate::Create((create_req, placement_policy)) => {
|
||||
let (create_resp, waiters) =
|
||||
self.do_tenant_create(create_req, placement_policy).await?;
|
||||
result.shards = create_resp
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|s| TenantShardLocation {
|
||||
node_id: s.node_id,
|
||||
shard_id: s.shard_id,
|
||||
})
|
||||
.collect();
|
||||
waiters
|
||||
}
|
||||
TenantCreateOrUpdate::Update(updates) => {
|
||||
// Persist updates
|
||||
// Ordering: write to the database before applying changes in-memory, so that
|
||||
// we will not appear time-travel backwards on a restart.
|
||||
for ShardUpdate {
|
||||
tenant_shard_id,
|
||||
placement_policy,
|
||||
tenant_config,
|
||||
generation,
|
||||
} in &updates
|
||||
{
|
||||
self.persistence
|
||||
.update_tenant_shard(
|
||||
*tenant_shard_id,
|
||||
placement_policy.clone(),
|
||||
tenant_config.clone(),
|
||||
*generation,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Maybe we have existing shards
|
||||
let mut create = true;
|
||||
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
// Saw an existing shard: this is not a creation
|
||||
create = false;
|
||||
// Apply updates in-memory
|
||||
let mut waiters = Vec::new();
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Note that for existing tenants we do _not_ respect the generation in the request: this is likely
|
||||
// to be stale. Once a tenant is created in this service, our view of generation is authoritative, and
|
||||
// callers' generations may be ignored. This represents a one-way migration of tenants from the outer
|
||||
// cloud control plane into this service.
|
||||
for ShardUpdate {
|
||||
tenant_shard_id,
|
||||
placement_policy,
|
||||
tenant_config,
|
||||
generation: update_generation,
|
||||
} in updates
|
||||
{
|
||||
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
tracing::warn!("Shard {tenant_shard_id} removed while updating");
|
||||
continue;
|
||||
};
|
||||
|
||||
// Use location config mode as an indicator of policy: if they ask for
|
||||
// attached we go to default HA attached mode. If they ask for secondary
|
||||
// we go to secondary-only mode. If they ask for detached we detach.
|
||||
match req.config.mode {
|
||||
LocationConfigMode::Detached => {
|
||||
shard.policy = PlacementPolicy::Detached;
|
||||
}
|
||||
LocationConfigMode::Secondary => {
|
||||
// TODO: implement secondary-only mode.
|
||||
todo!();
|
||||
}
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// TODO: persistence for changes in policy
|
||||
if nodes.len() > 1 {
|
||||
shard.policy = PlacementPolicy::Double(1)
|
||||
} else {
|
||||
// Convenience for dev/test: if we just have one pageserver, import
|
||||
// tenants into Single mode so that scheduling will succeed.
|
||||
shard.policy = PlacementPolicy::Single
|
||||
shard.policy = placement_policy;
|
||||
shard.config = tenant_config;
|
||||
if let Some(generation) = update_generation {
|
||||
shard.generation = Some(generation);
|
||||
}
|
||||
|
||||
shard.schedule(scheduler)?;
|
||||
|
||||
let maybe_waiter = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
if let Some(waiter) = maybe_waiter {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
result.shards.push(TenantShardLocation {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: *node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shard.schedule(scheduler)?;
|
||||
|
||||
let maybe_waiter = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
if let Some(waiter) = maybe_waiter {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
result.shards.push(TenantShardLocation {
|
||||
shard_id: *shard_id,
|
||||
node_id: *node_id,
|
||||
})
|
||||
}
|
||||
waiters
|
||||
}
|
||||
|
||||
if create {
|
||||
// Validate request mode
|
||||
match req.config.mode {
|
||||
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
|
||||
// When using this API to onboard an existing tenant to this service, it must start in
|
||||
// an attached state, because we need the request to come with a generation
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Imported tenant must be in attached mode"
|
||||
)));
|
||||
}
|
||||
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// Pass
|
||||
}
|
||||
}
|
||||
|
||||
// Validate request generation
|
||||
let Some(generation) = req.config.generation else {
|
||||
// We can only import attached tenants, because we need the request to come with a generation
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Generation is mandatory when importing tenant"
|
||||
)));
|
||||
};
|
||||
|
||||
// Synthesize a creation request
|
||||
Some(TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: Some(generation),
|
||||
shard_parameters: ShardParameters {
|
||||
// Must preserve the incoming shard_count do distinguish unsharded (0)
|
||||
// from single-sharded (1): this distinction appears in the S3 keys of the tenant.
|
||||
count: req.tenant_id.shard_count,
|
||||
// We only import un-sharded or single-sharded tenants, so stripe
|
||||
// size can be made up arbitrarily here.
|
||||
stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
},
|
||||
config: req.config.tenant_conf,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let waiters = if let Some(create_req) = maybe_create {
|
||||
let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
|
||||
result.shards = create_resp
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|s| TenantShardLocation {
|
||||
node_id: s.node_id,
|
||||
shard_id: s.shard_id,
|
||||
})
|
||||
.collect();
|
||||
waiters
|
||||
} else {
|
||||
waiters
|
||||
};
|
||||
|
||||
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
|
||||
@@ -1375,6 +1501,91 @@ impl Service {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_config_set(&self, req: TenantConfigRequest) -> Result<(), ApiError> {
|
||||
let tenant_id = req.tenant_id;
|
||||
let config = req.config;
|
||||
|
||||
self.persistence
|
||||
.update_tenant_config(req.tenant_id, config.clone())
|
||||
.await?;
|
||||
|
||||
let waiters = {
|
||||
let mut waiters = Vec::new();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
shard.config = config.clone();
|
||||
if let Some(waiter) = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
) {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
waiters
|
||||
};
|
||||
|
||||
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
|
||||
// Treat this as success because we have stored the configuration. If e.g.
|
||||
// a node was unavailable at this time, it should not stop us accepting a
|
||||
// configuration change.
|
||||
tracing::warn!(%tenant_id, "Accepted configuration update but reconciliation failed: {e}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_config_get(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<HashMap<&str, serde_json::Value>, ApiError> {
|
||||
let config = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
match locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.next()
|
||||
{
|
||||
Some((_tenant_shard_id, shard)) => shard.config.clone(),
|
||||
None => {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Unlike the pageserver, we do not have a set of global defaults: the config is
|
||||
// entirely per-tenant. Therefore the distinction between `tenant_specific_overrides`
|
||||
// and `effective_config` in the response is meaningless, but we retain that syntax
|
||||
// in order to remain compatible with the pageserver API.
|
||||
|
||||
let response = HashMap::from([
|
||||
(
|
||||
"tenant_specific_overrides",
|
||||
serde_json::to_value(&config)
|
||||
.context("serializing tenant specific overrides")
|
||||
.map_err(ApiError::InternalServerError)?,
|
||||
),
|
||||
(
|
||||
"effective_config",
|
||||
serde_json::to_value(&config)
|
||||
.context("serializing effective config")
|
||||
.map_err(ApiError::InternalServerError)?,
|
||||
),
|
||||
]);
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_time_travel_remote_storage(
|
||||
&self,
|
||||
time_travel_req: &TenantTimeTravelRequest,
|
||||
@@ -1460,6 +1671,60 @@ impl Service {
|
||||
})?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_secondary_download(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), ApiError> {
|
||||
// Acquire lock and yield the collection of shard-node tuples which we will send requests onward to
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
for node_id in shard.intent.get_secondary() {
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
}
|
||||
}
|
||||
targets
|
||||
};
|
||||
|
||||
// TODO: this API, and the underlying pageserver API, should take a timeout argument so that for long running
|
||||
// downloads, they can return a clean 202 response instead of the HTTP client timing out.
|
||||
|
||||
// Issue concurrent requests to all shards' locations
|
||||
let mut futs = FuturesUnordered::new();
|
||||
for (tenant_shard_id, node) in targets {
|
||||
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
futs.push(async move {
|
||||
let result = client.tenant_secondary_download(tenant_shard_id).await;
|
||||
(result, node)
|
||||
})
|
||||
}
|
||||
|
||||
// Handle any errors returned by pageservers. This includes cases like this request racing with
|
||||
// a scheduling operation, such that the tenant shard we're calling doesn't exist on that pageserver any more, as
|
||||
// well as more general cases like 503s, 500s, or timeouts.
|
||||
while let Some((result, node)) = futs.next().await {
|
||||
let Err(e) = result else { continue };
|
||||
|
||||
// Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever
|
||||
// is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache
|
||||
// than they had hoped for.
|
||||
tracing::warn!(
|
||||
"Ignoring tenant secondary download error from pageserver {}: {e}",
|
||||
node.id,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2039,8 +2304,8 @@ impl Service {
|
||||
// Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
|
||||
// populate the correct generation as part of its transaction, to protect us
|
||||
// against racing with changes in the state of the parent.
|
||||
generation: 0,
|
||||
generation_pageserver: target.node.id.0 as i64,
|
||||
generation: None,
|
||||
generation_pageserver: Some(target.node.id.0 as i64),
|
||||
placement_policy: serde_json::to_string(&policy).unwrap(),
|
||||
// TODO: get the config out of the map
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
@@ -2161,7 +2426,8 @@ impl Service {
|
||||
.expect("It was present, we just split it");
|
||||
let old_attached = old_state.intent.get_attached().unwrap();
|
||||
old_state.intent.clear(scheduler);
|
||||
(old_attached, old_state.generation, old_state.config.clone())
|
||||
let generation = old_state.generation.expect("Shard must have been attached");
|
||||
(old_attached, generation, old_state.config.clone())
|
||||
};
|
||||
|
||||
for child in child_ids {
|
||||
@@ -2182,7 +2448,7 @@ impl Service {
|
||||
child_state.observed = ObservedState {
|
||||
locations: child_observed,
|
||||
};
|
||||
child_state.generation = generation;
|
||||
child_state.generation = Some(generation);
|
||||
child_state.config = config.clone();
|
||||
|
||||
// The child's TenantState::splitting is intentionally left at the default value of Idle,
|
||||
@@ -2247,6 +2513,7 @@ impl Service {
|
||||
match shard.policy {
|
||||
PlacementPolicy::Single => {
|
||||
shard.intent.clear_secondary(scheduler);
|
||||
shard.intent.set_attached(scheduler, Some(migrate_req.node_id));
|
||||
}
|
||||
PlacementPolicy::Double(_n) => {
|
||||
// If our new attached node was a secondary, it no longer should be.
|
||||
@@ -2256,6 +2523,12 @@ impl Service {
|
||||
if let Some(old_attached) = old_attached {
|
||||
shard.intent.push_secondary(scheduler, old_attached);
|
||||
}
|
||||
|
||||
shard.intent.set_attached(scheduler, Some(migrate_req.node_id));
|
||||
}
|
||||
PlacementPolicy::Secondary => {
|
||||
shard.intent.clear(scheduler);
|
||||
shard.intent.push_secondary(scheduler, migrate_req.node_id);
|
||||
}
|
||||
PlacementPolicy::Detached => {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
@@ -2263,9 +2536,6 @@ impl Service {
|
||||
)))
|
||||
}
|
||||
}
|
||||
shard
|
||||
.intent
|
||||
.set_attached(scheduler, Some(migrate_req.node_id));
|
||||
|
||||
tracing::info!("Migrating: new intent {:?}", shard.intent);
|
||||
shard.sequence = shard.sequence.next();
|
||||
@@ -2593,7 +2863,7 @@ impl Service {
|
||||
observed_loc.conf = None;
|
||||
}
|
||||
|
||||
if tenant_state.intent.notify_offline(config_req.node_id) {
|
||||
if tenant_state.intent.demote_attached(config_req.node_id) {
|
||||
tenant_state.sequence = tenant_state.sequence.next();
|
||||
match tenant_state.schedule(scheduler) {
|
||||
Err(e) => {
|
||||
@@ -2660,6 +2930,9 @@ impl Service {
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
///
|
||||
/// TODO: this doesn't actually ensure attached unless the PlacementPolicy is
|
||||
/// an attached policy. We should error out if it isn't.
|
||||
fn ensure_attached_schedule(
|
||||
&self,
|
||||
mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
|
||||
|
||||
@@ -53,8 +53,11 @@ pub(crate) struct TenantState {
|
||||
pub(crate) sequence: Sequence,
|
||||
|
||||
// Latest generation number: next time we attach, increment this
|
||||
// and use the incremented number when attaching
|
||||
pub(crate) generation: Generation,
|
||||
// and use the incremented number when attaching.
|
||||
//
|
||||
// None represents an incompletely onboarded tenant via the [`Service::location_config`]
|
||||
// API, where this tenant may only run in PlacementPolicy::Secondary.
|
||||
pub(crate) generation: Option<Generation>,
|
||||
|
||||
// High level description of how the tenant should be set up. Provided
|
||||
// externally.
|
||||
@@ -181,6 +184,13 @@ impl IntentState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove the last secondary node from the list of secondaries
|
||||
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(node_id) = self.secondary.pop() {
|
||||
scheduler.node_dec_ref(node_id);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_dec_ref(old_attached);
|
||||
@@ -208,11 +218,13 @@ impl IntentState {
|
||||
&self.secondary
|
||||
}
|
||||
|
||||
/// When a node goes offline, we update intents to avoid using it
|
||||
/// as their attached pageserver.
|
||||
/// If the node is in use as the attached location, demote it into
|
||||
/// the list of secondary locations. This is used when a node goes offline,
|
||||
/// and we want to use a different node for attachment, but not permanently
|
||||
/// forget the location on the offline node.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
|
||||
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
|
||||
// need to call into it here.
|
||||
@@ -315,7 +327,7 @@ pub(crate) struct ReconcileResult {
|
||||
pub(crate) result: Result<(), ReconcileError>,
|
||||
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) generation: Option<Generation>,
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
/// Set [`TenantState::pending_compute_notification`] from this flag
|
||||
@@ -340,7 +352,7 @@ impl TenantState {
|
||||
tenant_shard_id,
|
||||
policy,
|
||||
intent: IntentState::default(),
|
||||
generation: Generation::new(0),
|
||||
generation: Some(Generation::new(0)),
|
||||
shard,
|
||||
observed: ObservedState::default(),
|
||||
config: TenantConfig::default(),
|
||||
@@ -438,10 +450,16 @@ impl TenantState {
|
||||
// more work on the same pageservers we're already using.
|
||||
let mut modified = false;
|
||||
|
||||
// Add/remove nodes to fulfil policy
|
||||
use PlacementPolicy::*;
|
||||
match self.policy {
|
||||
Single => {
|
||||
// Should have exactly one attached, and zero secondaries
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.clear_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
|
||||
modified |= modified_attached;
|
||||
|
||||
@@ -451,6 +469,23 @@ impl TenantState {
|
||||
}
|
||||
}
|
||||
Double(secondary_count) => {
|
||||
let retain_secondaries = if self.intent.attached.is_none()
|
||||
&& scheduler.node_preferred(&self.intent.secondary).is_some()
|
||||
{
|
||||
// If we have no attached, and one of the secondaries is elegible to be promoted, retain
|
||||
// one more secondary than we usually would, as one of them will become attached futher down this function.
|
||||
secondary_count + 1
|
||||
} else {
|
||||
secondary_count
|
||||
};
|
||||
|
||||
while self.intent.secondary.len() > retain_secondaries {
|
||||
// We have no particular preference for one secondary location over another: just
|
||||
// arbitrarily drop from the end
|
||||
self.intent.pop_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
// Should have exactly one attached, and N secondaries
|
||||
let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
|
||||
modified |= modified_attached;
|
||||
@@ -463,15 +498,28 @@ impl TenantState {
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
Detached => {
|
||||
// Should have no attached or secondary pageservers
|
||||
if self.intent.attached.is_some() {
|
||||
self.intent.set_attached(scheduler, None);
|
||||
Secondary => {
|
||||
if let Some(node_id) = self.intent.get_attached() {
|
||||
// Populate secondary by demoting the attached node
|
||||
self.intent.demote_attached(*node_id);
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
let node_id = scheduler.schedule_shard(&[])?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.clear_secondary(scheduler);
|
||||
while self.intent.secondary.len() > 1 {
|
||||
// We have no particular preference for one secondary location over another: just
|
||||
// arbitrarily drop from the end
|
||||
self.intent.pop_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
Detached => {
|
||||
// Never add locations in this mode
|
||||
if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
|
||||
self.intent.clear(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
@@ -518,7 +566,12 @@ impl TenantState {
|
||||
|
||||
fn dirty(&self) -> bool {
|
||||
if let Some(node_id) = self.intent.attached {
|
||||
let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
|
||||
// Maybe panic: it is a severe bug if we try to attach while generation is null.
|
||||
let generation = self
|
||||
.generation
|
||||
.expect("Attempted to enter attached state without a generation");
|
||||
|
||||
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
@@ -596,6 +649,10 @@ impl TenantState {
|
||||
// Reconcile already in flight for the current sequence?
|
||||
if let Some(handle) = &self.reconciler {
|
||||
if handle.sequence == self.sequence {
|
||||
tracing::info!(
|
||||
"Reconciliation already in progress for sequence {:?}",
|
||||
self.sequence,
|
||||
);
|
||||
return Some(ReconcilerWaiter {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
seq_wait: self.waiter.clone(),
|
||||
@@ -615,6 +672,10 @@ impl TenantState {
|
||||
return None;
|
||||
};
|
||||
|
||||
// Advance the sequence before spawning a reconciler, so that sequence waiters
|
||||
// can distinguish between before+after the reconcile completes.
|
||||
self.sequence = self.sequence.next();
|
||||
|
||||
let reconciler_cancel = cancel.child_token();
|
||||
let mut reconciler = Reconciler {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
@@ -716,6 +777,17 @@ impl TenantState {
|
||||
})
|
||||
}
|
||||
|
||||
/// Called when a ReconcileResult has been emitted and the service is updating
|
||||
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop
|
||||
/// the handle to indicate there is no longer a reconciliation in progress.
|
||||
pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
|
||||
if let Some(reconcile_handle) = &self.reconciler {
|
||||
if reconcile_handle.sequence <= sequence {
|
||||
self.reconciler = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we had any state at all referring to this node ID, drop it. Does not
|
||||
// attempt to reschedule.
|
||||
pub(crate) fn deref_node(&mut self, node_id: NodeId) {
|
||||
@@ -736,13 +808,8 @@ impl TenantState {
|
||||
shard_number: self.tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
|
||||
shard_stripe_size: self.shard.stripe_size.0 as i32,
|
||||
generation: self.generation.into().unwrap_or(0) as i32,
|
||||
generation_pageserver: self
|
||||
.intent
|
||||
.get_attached()
|
||||
.map(|n| n.0 as i64)
|
||||
.unwrap_or(i64::MAX),
|
||||
|
||||
generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
|
||||
generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
|
||||
placement_policy: serde_json::to_string(&self.policy).unwrap(),
|
||||
config: serde_json::to_string(&self.config).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
@@ -805,8 +872,10 @@ pub(crate) mod tests {
|
||||
assert_ne!(attached_node_id, secondary_node_id);
|
||||
|
||||
// Notifying the attached node is offline should demote it to a secondary
|
||||
let changed = tenant_state.intent.notify_offline(attached_node_id);
|
||||
let changed = tenant_state.intent.demote_attached(attached_node_id);
|
||||
assert!(changed);
|
||||
assert!(tenant_state.intent.attached.is_none());
|
||||
assert_eq!(tenant_state.intent.secondary.len(), 2);
|
||||
|
||||
// Update the scheduler state to indicate the node is offline
|
||||
nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
|
||||
|
||||
@@ -200,7 +200,7 @@ impl AttachmentService {
|
||||
"localhost",
|
||||
"-p",
|
||||
&format!("{}", self.postgres_port),
|
||||
&DB_NAME,
|
||||
DB_NAME,
|
||||
])
|
||||
.output()
|
||||
.await
|
||||
|
||||
@@ -605,7 +605,7 @@ impl Endpoint {
|
||||
let conn_str = self.connstr("cloud_admin", "postgres");
|
||||
println!("Starting postgres node at '{}'", conn_str);
|
||||
if create_test_user {
|
||||
let conn_str = self.connstr("user", "neondb");
|
||||
let conn_str = self.connstr("test", "neondb");
|
||||
println!("Also at '{}'", conn_str);
|
||||
}
|
||||
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Zenith storage node — alternative
|
||||
# Neon storage node — alternative
|
||||
|
||||
## **Design considerations**
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Command line interface (end-user)
|
||||
|
||||
Zenith CLI as it is described here mostly resides on the same conceptual level as pg_ctl/initdb/pg_recvxlog/etc and replaces some of them in an opinionated way. I would also suggest bundling our patched postgres inside zenith distribution at least at the start.
|
||||
Neon CLI as it is described here mostly resides on the same conceptual level as pg_ctl/initdb/pg_recvxlog/etc and replaces some of them in an opinionated way. I would also suggest bundling our patched postgres inside neon distribution at least at the start.
|
||||
|
||||
This proposal is focused on managing local installations. For cluster operations, different tooling would be needed. The point of integration between the two is storage URL: no matter how complex cluster setup is it may provide an endpoint where the user may push snapshots.
|
||||
|
||||
@@ -8,40 +8,40 @@ The most important concept here is a snapshot, which can be created/pushed/pulle
|
||||
|
||||
# Possible usage scenarios
|
||||
|
||||
## Install zenith, run a postgres
|
||||
## Install neon, run a postgres
|
||||
|
||||
```
|
||||
> brew install pg-zenith
|
||||
> zenith pg create # creates pgdata with default pattern pgdata$i
|
||||
> zenith pg list
|
||||
> brew install pg-neon
|
||||
> neon pg create # creates pgdata with default pattern pgdata$i
|
||||
> neon pg list
|
||||
ID PGDATA USED STORAGE ENDPOINT
|
||||
primary1 pgdata1 0G zenith-local localhost:5432
|
||||
primary1 pgdata1 0G neon-local localhost:5432
|
||||
```
|
||||
|
||||
## Import standalone postgres to zenith
|
||||
## Import standalone postgres to neon
|
||||
|
||||
```
|
||||
> zenith snapshot import --from=basebackup://replication@localhost:5432/ oldpg
|
||||
> neon snapshot import --from=basebackup://replication@localhost:5432/ oldpg
|
||||
[====================------------] 60% | 20MB/s
|
||||
> zenith snapshot list
|
||||
> neon snapshot list
|
||||
ID SIZE PARENT
|
||||
oldpg 5G -
|
||||
|
||||
> zenith pg create --snapshot oldpg
|
||||
> neon pg create --snapshot oldpg
|
||||
Started postgres on localhost:5432
|
||||
|
||||
> zenith pg list
|
||||
> neon pg list
|
||||
ID PGDATA USED STORAGE ENDPOINT
|
||||
primary1 pgdata1 5G zenith-local localhost:5432
|
||||
primary1 pgdata1 5G neon-local localhost:5432
|
||||
|
||||
> zenith snapshot destroy oldpg
|
||||
> neon snapshot destroy oldpg
|
||||
Ok
|
||||
```
|
||||
|
||||
Also, we may start snapshot import implicitly by looking at snapshot schema
|
||||
|
||||
```
|
||||
> zenith pg create --snapshot basebackup://replication@localhost:5432/
|
||||
> neon pg create --snapshot basebackup://replication@localhost:5432/
|
||||
Downloading snapshot... Done.
|
||||
Started postgres on localhost:5432
|
||||
Destroying snapshot... Done.
|
||||
@@ -52,39 +52,39 @@ Destroying snapshot... Done.
|
||||
Since we may export the whole snapshot as one big file (tar of basebackup, maybe with some manifest) it may be shared over conventional means: http, ssh, [git+lfs](https://docs.github.com/en/github/managing-large-files/about-git-large-file-storage).
|
||||
|
||||
```
|
||||
> zenith pg create --snapshot http://learn-postgres.com/movies_db.zenith movies
|
||||
> neon pg create --snapshot http://learn-postgres.com/movies_db.neon movies
|
||||
```
|
||||
|
||||
## Create snapshot and push it to the cloud
|
||||
|
||||
```
|
||||
> zenith snapshot create pgdata1@snap1
|
||||
> zenith snapshot push --to ssh://stas@zenith.tech pgdata1@snap1
|
||||
> neon snapshot create pgdata1@snap1
|
||||
> neon snapshot push --to ssh://stas@neon.tech pgdata1@snap1
|
||||
```
|
||||
|
||||
## Rollback database to the snapshot
|
||||
|
||||
One way to rollback the database is just to init a new database from the snapshot and destroy the old one. But creating a new database from a snapshot would require a copy of that snapshot which is time consuming operation. Another option that would be cool to support is the ability to create the copy-on-write database from the snapshot without copying data, and store updated pages in a separate location, however that way would have performance implications. So to properly rollback the database to the older state we have `zenith pg checkout`.
|
||||
One way to rollback the database is just to init a new database from the snapshot and destroy the old one. But creating a new database from a snapshot would require a copy of that snapshot which is time consuming operation. Another option that would be cool to support is the ability to create the copy-on-write database from the snapshot without copying data, and store updated pages in a separate location, however that way would have performance implications. So to properly rollback the database to the older state we have `neon pg checkout`.
|
||||
|
||||
```
|
||||
> zenith pg list
|
||||
> neon pg list
|
||||
ID PGDATA USED STORAGE ENDPOINT
|
||||
primary1 pgdata1 5G zenith-local localhost:5432
|
||||
primary1 pgdata1 5G neon-local localhost:5432
|
||||
|
||||
> zenith snapshot create pgdata1@snap1
|
||||
> neon snapshot create pgdata1@snap1
|
||||
|
||||
> zenith snapshot list
|
||||
> neon snapshot list
|
||||
ID SIZE PARENT
|
||||
oldpg 5G -
|
||||
pgdata1@snap1 6G -
|
||||
pgdata1@CURRENT 6G -
|
||||
|
||||
> zenith pg checkout pgdata1@snap1
|
||||
> neon pg checkout pgdata1@snap1
|
||||
Stopping postgres on pgdata1.
|
||||
Rolling back pgdata1@CURRENT to pgdata1@snap1.
|
||||
Starting postgres on pgdata1.
|
||||
|
||||
> zenith snapshot list
|
||||
> neon snapshot list
|
||||
ID SIZE PARENT
|
||||
oldpg 5G -
|
||||
pgdata1@snap1 6G -
|
||||
@@ -99,7 +99,7 @@ Some notes: pgdata1@CURRENT -- implicit snapshot representing the current state
|
||||
PITR area acts like a continuous snapshot where you can reset the database to any point in time within this area (by area I mean some TTL period or some size limit, both possibly infinite).
|
||||
|
||||
```
|
||||
> zenith pitr create --storage s3tank --ttl 30d --name pitr_last_month
|
||||
> neon pitr create --storage s3tank --ttl 30d --name pitr_last_month
|
||||
```
|
||||
|
||||
Resetting the database to some state in past would require creating a snapshot on some lsn / time in this pirt area.
|
||||
@@ -108,29 +108,29 @@ Resetting the database to some state in past would require creating a snapshot o
|
||||
|
||||
## storage
|
||||
|
||||
Storage is either zenith pagestore or s3. Users may create a database in a pagestore and create/move *snapshots* and *pitr regions* in both pagestore and s3. Storage is a concept similar to `git remote`. After installation, I imagine one local storage is available by default.
|
||||
Storage is either neon pagestore or s3. Users may create a database in a pagestore and create/move *snapshots* and *pitr regions* in both pagestore and s3. Storage is a concept similar to `git remote`. After installation, I imagine one local storage is available by default.
|
||||
|
||||
**zenith storage attach** -t [native|s3] -c key=value -n name
|
||||
**neon storage attach** -t [native|s3] -c key=value -n name
|
||||
|
||||
Attaches/initializes storage. For --type=s3, user credentials and path should be provided. For --type=native we may support --path=/local/path and --url=zenith.tech/stas/mystore. Other possible term for native is 'zstore'.
|
||||
Attaches/initializes storage. For --type=s3, user credentials and path should be provided. For --type=native we may support --path=/local/path and --url=neon.tech/stas/mystore. Other possible term for native is 'zstore'.
|
||||
|
||||
|
||||
**zenith storage list**
|
||||
**neon storage list**
|
||||
|
||||
Show currently attached storages. For example:
|
||||
|
||||
```
|
||||
> zenith storage list
|
||||
> neon storage list
|
||||
NAME USED TYPE OPTIONS PATH
|
||||
local 5.1G zenith-local /opt/zenith/store/local
|
||||
local.compr 20.4G zenith-local compression=on /opt/zenith/store/local.compr
|
||||
zcloud 60G zenith-remote zenith.tech/stas/mystore
|
||||
local 5.1G neon-local /opt/neon/store/local
|
||||
local.compr 20.4G neon-local compression=on /opt/neon/store/local.compr
|
||||
zcloud 60G neon-remote neon.tech/stas/mystore
|
||||
s3tank 80G S3
|
||||
```
|
||||
|
||||
**zenith storage detach**
|
||||
**neon storage detach**
|
||||
|
||||
**zenith storage show**
|
||||
**neon storage show**
|
||||
|
||||
|
||||
|
||||
@@ -140,29 +140,29 @@ Manages postgres data directories and can start postgres instances with proper c
|
||||
|
||||
Pg is a term for a single postgres running on some data. I'm trying to avoid separation of datadir management and postgres instance management -- both that concepts bundled here together.
|
||||
|
||||
**zenith pg create** [--no-start --snapshot --cow] -s storage-name -n pgdata
|
||||
**neon pg create** [--no-start --snapshot --cow] -s storage-name -n pgdata
|
||||
|
||||
Creates (initializes) new data directory in given storage and starts postgres. I imagine that storage for this operation may be only local and data movement to remote location happens through snapshots/pitr.
|
||||
|
||||
--no-start: just init datadir without creating
|
||||
|
||||
--snapshot snap: init from the snapshot. Snap is a name or URL (zenith.tech/stas/mystore/snap1)
|
||||
--snapshot snap: init from the snapshot. Snap is a name or URL (neon.tech/stas/mystore/snap1)
|
||||
|
||||
--cow: initialize Copy-on-Write data directory on top of some snapshot (makes sense if it is a snapshot of currently running a database)
|
||||
|
||||
**zenith pg destroy**
|
||||
**neon pg destroy**
|
||||
|
||||
**zenith pg start** [--replica] pgdata
|
||||
**neon pg start** [--replica] pgdata
|
||||
|
||||
Start postgres with proper extensions preloaded/installed.
|
||||
|
||||
**zenith pg checkout**
|
||||
**neon pg checkout**
|
||||
|
||||
Rollback data directory to some previous snapshot.
|
||||
|
||||
**zenith pg stop** pg_id
|
||||
**neon pg stop** pg_id
|
||||
|
||||
**zenith pg list**
|
||||
**neon pg list**
|
||||
|
||||
```
|
||||
ROLE PGDATA USED STORAGE ENDPOINT
|
||||
@@ -173,7 +173,7 @@ primary my_pg2 3.2G local.compr localhost:5435
|
||||
- my_pg3 9.2G local.compr -
|
||||
```
|
||||
|
||||
**zenith pg show**
|
||||
**neon pg show**
|
||||
|
||||
```
|
||||
my_pg:
|
||||
@@ -194,7 +194,7 @@ my_pg:
|
||||
|
||||
```
|
||||
|
||||
**zenith pg start-rest/graphql** pgdata
|
||||
**neon pg start-rest/graphql** pgdata
|
||||
|
||||
Starts REST/GraphQL proxy on top of postgres master. Not sure we should do that, just an idea.
|
||||
|
||||
@@ -203,35 +203,35 @@ Starts REST/GraphQL proxy on top of postgres master. Not sure we should do that,
|
||||
|
||||
Snapshot creation is cheap -- no actual data is copied, we just start retaining old pages. Snapshot size means the amount of retained data, not all data. Snapshot name looks like pgdata_name@tag_name. tag_name is set by the user during snapshot creation. There are some reserved tag names: CURRENT represents the current state of the data directory; HEAD{i} represents the data directory state that resided in the database before i-th checkout.
|
||||
|
||||
**zenith snapshot create** pgdata_name@snap_name
|
||||
**neon snapshot create** pgdata_name@snap_name
|
||||
|
||||
Creates a new snapshot in the same storage where pgdata_name exists.
|
||||
|
||||
**zenith snapshot push** --to url pgdata_name@snap_name
|
||||
**neon snapshot push** --to url pgdata_name@snap_name
|
||||
|
||||
Produces binary stream of a given snapshot. Under the hood starts temp read-only postgres over this snapshot and sends basebackup stream. Receiving side should start `zenith snapshot recv` before push happens. If url has some special schema like zenith:// receiving side may require auth start `zenith snapshot recv` on the go.
|
||||
Produces binary stream of a given snapshot. Under the hood starts temp read-only postgres over this snapshot and sends basebackup stream. Receiving side should start `neon snapshot recv` before push happens. If url has some special schema like neon:// receiving side may require auth start `neon snapshot recv` on the go.
|
||||
|
||||
**zenith snapshot recv**
|
||||
**neon snapshot recv**
|
||||
|
||||
Starts a port listening for a basebackup stream, prints connection info to stdout (so that user may use that in push command), and expects data on that socket.
|
||||
|
||||
**zenith snapshot pull** --from url or path
|
||||
**neon snapshot pull** --from url or path
|
||||
|
||||
Connects to a remote zenith/s3/file and pulls snapshot. The remote site should be zenith service or files in our format.
|
||||
Connects to a remote neon/s3/file and pulls snapshot. The remote site should be neon service or files in our format.
|
||||
|
||||
**zenith snapshot import** --from basebackup://<...> or path
|
||||
**neon snapshot import** --from basebackup://<...> or path
|
||||
|
||||
Creates a new snapshot out of running postgres via basebackup protocol or basebackup files.
|
||||
|
||||
**zenith snapshot export**
|
||||
**neon snapshot export**
|
||||
|
||||
Starts read-only postgres over this snapshot and exports data in some format (pg_dump, or COPY TO on some/all tables). One of the options may be zenith own format which is handy for us (but I think just tar of basebackup would be okay).
|
||||
Starts read-only postgres over this snapshot and exports data in some format (pg_dump, or COPY TO on some/all tables). One of the options may be neon own format which is handy for us (but I think just tar of basebackup would be okay).
|
||||
|
||||
**zenith snapshot diff** snap1 snap2
|
||||
**neon snapshot diff** snap1 snap2
|
||||
|
||||
Shows size of data changed between two snapshots. We also may provide options to diff schema/data in tables. To do that start temp read-only postgreses.
|
||||
|
||||
**zenith snapshot destroy**
|
||||
**neon snapshot destroy**
|
||||
|
||||
## pitr
|
||||
|
||||
@@ -239,7 +239,7 @@ Pitr represents wal stream and ttl policy for that stream
|
||||
|
||||
XXX: any suggestions on a better name?
|
||||
|
||||
**zenith pitr create** name
|
||||
**neon pitr create** name
|
||||
|
||||
--ttl = inf | period
|
||||
|
||||
@@ -247,21 +247,21 @@ XXX: any suggestions on a better name?
|
||||
|
||||
--storage = storage_name
|
||||
|
||||
**zenith pitr extract-snapshot** pitr_name --lsn xxx
|
||||
**neon pitr extract-snapshot** pitr_name --lsn xxx
|
||||
|
||||
Creates a snapshot out of some lsn in PITR area. The obtained snapshot may be managed with snapshot routines (move/send/export)
|
||||
|
||||
**zenith pitr gc** pitr_name
|
||||
**neon pitr gc** pitr_name
|
||||
|
||||
Force garbage collection on some PITR area.
|
||||
|
||||
**zenith pitr list**
|
||||
**neon pitr list**
|
||||
|
||||
**zenith pitr destroy**
|
||||
**neon pitr destroy**
|
||||
|
||||
|
||||
## console
|
||||
|
||||
**zenith console**
|
||||
**neon console**
|
||||
|
||||
Opens browser targeted at web console with the more or less same functionality as described here.
|
||||
|
||||
@@ -6,7 +6,7 @@ When do we consider the WAL record as durable, so that we can
|
||||
acknowledge the commit to the client and be reasonably certain that we
|
||||
will not lose the transaction?
|
||||
|
||||
Zenith uses a group of WAL safekeeper nodes to hold the generated WAL.
|
||||
Neon uses a group of WAL safekeeper nodes to hold the generated WAL.
|
||||
A WAL record is considered durable, when it has been written to a
|
||||
majority of WAL safekeeper nodes. In this document, I use 5
|
||||
safekeepers, because I have five fingers. A WAL record is durable,
|
||||
|
||||
@@ -1,23 +1,23 @@
|
||||
# Zenith local
|
||||
# Neon local
|
||||
|
||||
Here I list some objectives to keep in mind when discussing zenith-local design and a proposal that brings all components together. Your comments on both parts are very welcome.
|
||||
Here I list some objectives to keep in mind when discussing neon-local design and a proposal that brings all components together. Your comments on both parts are very welcome.
|
||||
|
||||
#### Why do we need it?
|
||||
- For distribution - this easy to use binary will help us to build adoption among developers.
|
||||
- For internal use - to test all components together.
|
||||
|
||||
In my understanding, we consider it to be just a mock-up version of zenith-cloud.
|
||||
In my understanding, we consider it to be just a mock-up version of neon-cloud.
|
||||
> Question: How much should we care about durability and security issues for a local setup?
|
||||
|
||||
|
||||
#### Why is it better than a simple local postgres?
|
||||
|
||||
- Easy one-line setup. As simple as `cargo install zenith && zenith start`
|
||||
- Easy one-line setup. As simple as `cargo install neon && neon start`
|
||||
|
||||
- Quick and cheap creation of compute nodes over the same storage.
|
||||
> Question: How can we describe a use-case for this feature?
|
||||
|
||||
- Zenith-local can work with S3 directly.
|
||||
- Neon-local can work with S3 directly.
|
||||
|
||||
- Push and pull images (snapshots) to remote S3 to exchange data with other users.
|
||||
|
||||
@@ -31,50 +31,50 @@ Ideally, just one binary that incorporates all elements we need.
|
||||
|
||||
#### Components:
|
||||
|
||||
- **zenith-CLI** - interface for end-users. Turns commands to REST requests and handles responses to show them in a user-friendly way.
|
||||
CLI proposal is here https://github.com/libzenith/rfcs/blob/003-laptop-cli.md/003-laptop-cli.md
|
||||
WIP code is here: https://github.com/libzenith/postgres/tree/main/pageserver/src/bin/cli
|
||||
- **neon-CLI** - interface for end-users. Turns commands to REST requests and handles responses to show them in a user-friendly way.
|
||||
CLI proposal is here https://github.com/neondatabase/rfcs/blob/003-laptop-cli.md/003-laptop-cli.md
|
||||
WIP code is here: https://github.com/neondatabase/postgres/tree/main/pageserver/src/bin/cli
|
||||
|
||||
- **zenith-console** - WEB UI with same functionality as CLI.
|
||||
- **neon-console** - WEB UI with same functionality as CLI.
|
||||
>Note: not for the first release.
|
||||
|
||||
- **zenith-local** - entrypoint. Service that starts all other components and handles REST API requests. See REST API proposal below.
|
||||
> Idea: spawn all other components as child processes, so that we could shutdown everything by stopping zenith-local.
|
||||
- **neon-local** - entrypoint. Service that starts all other components and handles REST API requests. See REST API proposal below.
|
||||
> Idea: spawn all other components as child processes, so that we could shutdown everything by stopping neon-local.
|
||||
|
||||
- **zenith-pageserver** - consists of a storage and WAL-replaying service (modified PG in current implementation).
|
||||
- **neon-pageserver** - consists of a storage and WAL-replaying service (modified PG in current implementation).
|
||||
> Question: Probably, for local setup we should be able to bypass page-storage and interact directly with S3 to avoid double caching in shared buffers and page-server?
|
||||
|
||||
WIP code is here: https://github.com/libzenith/postgres/tree/main/pageserver/src
|
||||
WIP code is here: https://github.com/neondatabase/postgres/tree/main/pageserver/src
|
||||
|
||||
- **zenith-S3** - stores base images of the database and WAL in S3 object storage. Import and export images from/to zenith.
|
||||
- **neon-S3** - stores base images of the database and WAL in S3 object storage. Import and export images from/to neon.
|
||||
> Question: How should it operate in a local setup? Will we manage it ourselves or ask user to provide credentials for existing S3 object storage (i.e. minio)?
|
||||
> Question: Do we use it together with local page store or they are interchangeable?
|
||||
|
||||
WIP code is ???
|
||||
|
||||
- **zenith-safekeeper** - receives WAL from postgres, stores it durably, answers to Postgres that "sync" is succeed.
|
||||
- **neon-safekeeper** - receives WAL from postgres, stores it durably, answers to Postgres that "sync" is succeed.
|
||||
> Question: How should it operate in a local setup? In my understanding it should push WAL directly to S3 (if we use it) or store all data locally (if we use local page storage). The latter option seems meaningless (extra overhead and no gain), but it is still good to test the system.
|
||||
|
||||
WIP code is here: https://github.com/libzenith/postgres/tree/main/src/bin/safekeeper
|
||||
WIP code is here: https://github.com/neondatabase/postgres/tree/main/src/bin/safekeeper
|
||||
|
||||
- **zenith-computenode** - bottomless PostgreSQL, ideally upstream, but for a start - our modified version. User can quickly create and destroy them and work with it as a regular postgres database.
|
||||
- **neon-computenode** - bottomless PostgreSQL, ideally upstream, but for a start - our modified version. User can quickly create and destroy them and work with it as a regular postgres database.
|
||||
|
||||
WIP code is in main branch and here: https://github.com/libzenith/postgres/commits/compute_node
|
||||
WIP code is in main branch and here: https://github.com/neondatabase/postgres/commits/compute_node
|
||||
|
||||
#### REST API:
|
||||
|
||||
Service endpoint: `http://localhost:3000`
|
||||
|
||||
Resources:
|
||||
- /storages - Where data lives: zenith-pageserver or zenith-s3
|
||||
- /pgs - Postgres - zenith-computenode
|
||||
- /storages - Where data lives: neon-pageserver or neon-s3
|
||||
- /pgs - Postgres - neon-computenode
|
||||
- /snapshots - snapshots **TODO**
|
||||
|
||||
>Question: Do we want to extend this API to manage zenith components? I.e. start page-server, manage safekeepers and so on? Or they will be hardcoded to just start once and for all?
|
||||
>Question: Do we want to extend this API to manage neon components? I.e. start page-server, manage safekeepers and so on? Or they will be hardcoded to just start once and for all?
|
||||
|
||||
Methods and their mapping to CLI:
|
||||
|
||||
- /storages - zenith-pageserver or zenith-s3
|
||||
- /storages - neon-pageserver or neon-s3
|
||||
|
||||
CLI | REST API
|
||||
------------- | -------------
|
||||
@@ -84,7 +84,7 @@ storage list | GET /storages
|
||||
storage show -n name | GET /storages/:storage_name
|
||||
|
||||
|
||||
- /pgs - zenith-computenode
|
||||
- /pgs - neon-computenode
|
||||
|
||||
CLI | REST API
|
||||
------------- | -------------
|
||||
|
||||
@@ -1,45 +1,45 @@
|
||||
Zenith CLI allows you to operate database clusters (catalog clusters) and their commit history locally and in the cloud. Since ANSI calls them catalog clusters and cluster is a loaded term in the modern infrastructure we will call it "catalog".
|
||||
Neon CLI allows you to operate database clusters (catalog clusters) and their commit history locally and in the cloud. Since ANSI calls them catalog clusters and cluster is a loaded term in the modern infrastructure we will call it "catalog".
|
||||
|
||||
# CLI v2 (after chatting with Carl)
|
||||
|
||||
Zenith introduces the notion of a repository.
|
||||
Neon introduces the notion of a repository.
|
||||
|
||||
```bash
|
||||
zenith init
|
||||
zenith clone zenith://zenith.tech/piedpiper/northwind -- clones a repo to the northwind directory
|
||||
neon init
|
||||
neon clone neon://neon.tech/piedpiper/northwind -- clones a repo to the northwind directory
|
||||
```
|
||||
|
||||
Once you have a cluster catalog you can explore it
|
||||
|
||||
```bash
|
||||
zenith log -- returns a list of commits
|
||||
zenith status -- returns if there are changes in the catalog that can be committed
|
||||
zenith commit -- commits the changes and generates a new commit hash
|
||||
zenith branch experimental <hash> -- creates a branch called testdb based on a given commit hash
|
||||
neon log -- returns a list of commits
|
||||
neon status -- returns if there are changes in the catalog that can be committed
|
||||
neon commit -- commits the changes and generates a new commit hash
|
||||
neon branch experimental <hash> -- creates a branch called testdb based on a given commit hash
|
||||
```
|
||||
|
||||
To make changes in the catalog you need to run compute nodes
|
||||
|
||||
```bash
|
||||
-- here is how you a compute node
|
||||
zenith start /home/pipedpiper/northwind:main -- starts a compute instance
|
||||
zenith start zenith://zenith.tech/northwind:main -- starts a compute instance in the cloud
|
||||
neon start /home/pipedpiper/northwind:main -- starts a compute instance
|
||||
neon start neon://neon.tech/northwind:main -- starts a compute instance in the cloud
|
||||
-- you can start a compute node against any hash or branch
|
||||
zenith start /home/pipedpiper/northwind:experimental --port 8008 -- start another compute instance (on different port)
|
||||
neon start /home/pipedpiper/northwind:experimental --port 8008 -- start another compute instance (on different port)
|
||||
-- you can start a compute node against any hash or branch
|
||||
zenith start /home/pipedpiper/northwind:<hash> --port 8009 -- start another compute instance (on different port)
|
||||
neon start /home/pipedpiper/northwind:<hash> --port 8009 -- start another compute instance (on different port)
|
||||
|
||||
-- After running some DML you can run
|
||||
-- zenith status and see how there are two WAL streams one on top of
|
||||
-- neon status and see how there are two WAL streams one on top of
|
||||
-- the main branch
|
||||
zenith status
|
||||
neon status
|
||||
-- and another on top of the experimental branch
|
||||
zenith status -b experimental
|
||||
neon status -b experimental
|
||||
|
||||
-- you can commit each branch separately
|
||||
zenith commit main
|
||||
neon commit main
|
||||
-- or
|
||||
zenith commit -c /home/pipedpiper/northwind:experimental
|
||||
neon commit -c /home/pipedpiper/northwind:experimental
|
||||
```
|
||||
|
||||
Starting compute instances against cloud environments
|
||||
@@ -47,18 +47,18 @@ Starting compute instances against cloud environments
|
||||
```bash
|
||||
-- you can start a compute instance against the cloud environment
|
||||
-- in this case all of the changes will be streamed into the cloud
|
||||
zenith start https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith start https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith status -c https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith commit -c https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith branch -c https://zenith:tech/pipedpiper/northwind:<hash> experimental
|
||||
neon start https://neon:tecj/pipedpiper/northwind:main
|
||||
neon start https://neon:tecj/pipedpiper/northwind:main
|
||||
neon status -c https://neon:tecj/pipedpiper/northwind:main
|
||||
neon commit -c https://neon:tecj/pipedpiper/northwind:main
|
||||
neon branch -c https://neon:tecj/pipedpiper/northwind:<hash> experimental
|
||||
```
|
||||
|
||||
Pushing data into the cloud
|
||||
|
||||
```bash
|
||||
-- pull all the commits from the cloud
|
||||
zenith pull
|
||||
neon pull
|
||||
-- push all the commits to the cloud
|
||||
zenith push
|
||||
neon push
|
||||
```
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
# Repository format
|
||||
|
||||
A Zenith repository is similar to a traditional PostgreSQL backup
|
||||
A Neon repository is similar to a traditional PostgreSQL backup
|
||||
archive, like a WAL-G bucket or pgbarman backup catalogue. It holds
|
||||
multiple versions of a PostgreSQL database cluster.
|
||||
|
||||
The distinguishing feature is that you can launch a Zenith Postgres
|
||||
The distinguishing feature is that you can launch a Neon Postgres
|
||||
server directly against a branch in the repository, without having to
|
||||
"restore" it first. Also, Zenith manages the storage automatically,
|
||||
"restore" it first. Also, Neon manages the storage automatically,
|
||||
there is no separation between full and incremental backups nor WAL
|
||||
archive. Zenith relies heavily on the WAL, and uses concepts similar
|
||||
archive. Neon relies heavily on the WAL, and uses concepts similar
|
||||
to incremental backups and WAL archiving internally, but it is hidden
|
||||
from the user.
|
||||
|
||||
@@ -19,15 +19,15 @@ efficient. Just something to get us started.
|
||||
|
||||
The repository directory looks like this:
|
||||
|
||||
.zenith/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/wal/
|
||||
.zenith/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/snapshots/<lsn>/
|
||||
.zenith/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/history
|
||||
.neon/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/wal/
|
||||
.neon/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/snapshots/<lsn>/
|
||||
.neon/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/history
|
||||
|
||||
.zenith/refs/branches/mybranch
|
||||
.zenith/refs/tags/foo
|
||||
.zenith/refs/tags/bar
|
||||
.neon/refs/branches/mybranch
|
||||
.neon/refs/tags/foo
|
||||
.neon/refs/tags/bar
|
||||
|
||||
.zenith/datadirs/<timeline uuid>
|
||||
.neon/datadirs/<timeline uuid>
|
||||
|
||||
### Timelines
|
||||
|
||||
@@ -39,7 +39,7 @@ All WAL is generated on a timeline. You can launch a read-only node
|
||||
against a tag or arbitrary LSN on a timeline, but in order to write,
|
||||
you need to create a timeline.
|
||||
|
||||
Each timeline is stored in a directory under .zenith/timelines. It
|
||||
Each timeline is stored in a directory under .neon/timelines. It
|
||||
consists of a WAL archive, containing all the WAL in the standard
|
||||
PostgreSQL format, under the wal/ subdirectory.
|
||||
|
||||
@@ -66,18 +66,18 @@ contains the UUID of the timeline (and LSN, for tags).
|
||||
|
||||
### Datadirs
|
||||
|
||||
.zenith/datadirs contains PostgreSQL data directories. You can launch
|
||||
.neon/datadirs contains PostgreSQL data directories. You can launch
|
||||
a Postgres instance on one of them with:
|
||||
|
||||
```
|
||||
postgres -D .zenith/datadirs/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c
|
||||
postgres -D .neon/datadirs/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c
|
||||
```
|
||||
|
||||
All the actual data is kept in the timeline directories, under
|
||||
.zenith/timelines. The data directories are only needed for active
|
||||
.neon/timelines. The data directories are only needed for active
|
||||
PostgreQSL instances. After an instance is stopped, the data directory
|
||||
can be safely removed. "zenith start" will recreate it quickly from
|
||||
the data in .zenith/timelines, if it's missing.
|
||||
can be safely removed. "neon start" will recreate it quickly from
|
||||
the data in .neon/timelines, if it's missing.
|
||||
|
||||
## Version 2
|
||||
|
||||
@@ -103,14 +103,14 @@ more advanced. The exact format is TODO. But it should support:
|
||||
|
||||
### Garbage collection
|
||||
|
||||
When you run "zenith gc", old timelines that are no longer needed are
|
||||
When you run "neon gc", old timelines that are no longer needed are
|
||||
removed. That involves collecting the list of "unreachable" objects,
|
||||
starting from the named branches and tags.
|
||||
|
||||
Also, if enough WAL has been generated on a timeline since last
|
||||
snapshot, a new snapshot or delta is created.
|
||||
|
||||
### zenith push/pull
|
||||
### neon push/pull
|
||||
|
||||
Compare the tags and branches on both servers, and copy missing ones.
|
||||
For each branch, compare the timeline it points to in both servers. If
|
||||
@@ -123,7 +123,7 @@ every time you start up an instance? Then you would detect that the
|
||||
timelines have diverged. That would match with the "epoch" concept
|
||||
that we have in the WAL safekeeper
|
||||
|
||||
### zenith checkout/commit
|
||||
### neon checkout/commit
|
||||
|
||||
In this format, there is no concept of a "working tree", and hence no
|
||||
concept of checking out or committing. All modifications are done on
|
||||
@@ -134,7 +134,7 @@ You can easily fork off a temporary timeline to emulate a "working tree".
|
||||
You can later remove it and have it garbage collected, or to "commit",
|
||||
re-point the branch to the new timeline.
|
||||
|
||||
If we want to have a worktree and "zenith checkout/commit" concept, we can
|
||||
If we want to have a worktree and "neon checkout/commit" concept, we can
|
||||
emulate that with a temporary timeline. Create the temporary timeline at
|
||||
"zenith checkout", and have "zenith commit" modify the branch to point to
|
||||
"neon checkout", and have "neon commit" modify the branch to point to
|
||||
the new timeline.
|
||||
|
||||
@@ -4,27 +4,27 @@ How it works now
|
||||
1. Create repository, start page server on it
|
||||
|
||||
```
|
||||
$ zenith init
|
||||
$ neon init
|
||||
...
|
||||
created main branch
|
||||
new zenith repository was created in .zenith
|
||||
new neon repository was created in .neon
|
||||
|
||||
$ zenith pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .zenith
|
||||
$ neon pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .neon
|
||||
Page server started
|
||||
```
|
||||
|
||||
2. Create a branch, and start a Postgres instance on it
|
||||
|
||||
```
|
||||
$ zenith branch heikki main
|
||||
$ neon branch heikki main
|
||||
branching at end of WAL: 0/15ECF68
|
||||
|
||||
$ zenith pg create heikki
|
||||
$ neon pg create heikki
|
||||
Initializing Postgres on timeline 76cf9279915be7797095241638e64644...
|
||||
Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/pg1 port=55432
|
||||
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/pg1 port=55432
|
||||
|
||||
$ zenith pg start pg1
|
||||
$ neon pg start pg1
|
||||
Starting postgres node at 'host=127.0.0.1 port=55432 user=heikki'
|
||||
waiting for server to start.... done
|
||||
server started
|
||||
@@ -52,20 +52,20 @@ serverless on your laptop, so that the workflow becomes just:
|
||||
1. Create repository, start page server on it (same as before)
|
||||
|
||||
```
|
||||
$ zenith init
|
||||
$ neon init
|
||||
...
|
||||
created main branch
|
||||
new zenith repository was created in .zenith
|
||||
new neon repository was created in .neon
|
||||
|
||||
$ zenith pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .zenith
|
||||
$ neon pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .neon
|
||||
Page server started
|
||||
```
|
||||
|
||||
2. Create branch
|
||||
|
||||
```
|
||||
$ zenith branch heikki main
|
||||
$ neon branch heikki main
|
||||
branching at end of WAL: 0/15ECF68
|
||||
```
|
||||
|
||||
|
||||
@@ -7,22 +7,22 @@ Here is a proposal about implementing push/pull mechanics between pageservers. W
|
||||
The origin represents connection info for some remote pageserver. Let's use here same commands as git uses except using explicit list subcommand (git uses `origin -v` for that).
|
||||
|
||||
```
|
||||
zenith origin add <name> <connection_uri>
|
||||
zenith origin list
|
||||
zenith origin remove <name>
|
||||
neon origin add <name> <connection_uri>
|
||||
neon origin list
|
||||
neon origin remove <name>
|
||||
```
|
||||
|
||||
Connection URI a string of form `postgresql://user:pass@hostname:port` (https://www.postgresql.org/docs/13/libpq-connect.html#id-1.7.3.8.3.6). We can start with libpq password auth and later add support for client certs or require ssh as transport or invent some other kind of transport.
|
||||
|
||||
Behind the scenes, this commands may update toml file inside .zenith directory.
|
||||
Behind the scenes, this commands may update toml file inside .neon directory.
|
||||
|
||||
## Push
|
||||
|
||||
### Pushing branch
|
||||
|
||||
```
|
||||
zenith push mybranch cloudserver # push to eponymous branch in cloudserver
|
||||
zenith push mybranch cloudserver:otherbranch # push to a different branch in cloudserver
|
||||
neon push mybranch cloudserver # push to eponymous branch in cloudserver
|
||||
neon push mybranch cloudserver:otherbranch # push to a different branch in cloudserver
|
||||
```
|
||||
|
||||
Exact mechanics would be slightly different in the following situations:
|
||||
|
||||
@@ -2,7 +2,7 @@ While working on export/import commands, I understood that they fit really well
|
||||
|
||||
We may think about backups as snapshots in a different format (i.e plain pgdata format, basebackup tar format, WAL-G format (if they want to support it) and so on). They use same storage API, the only difference is the code that packs/unpacks files.
|
||||
|
||||
Even if zenith aims to maintains durability using it's own snapshots, backups will be useful for uploading data from postgres to zenith.
|
||||
Even if neon aims to maintains durability using it's own snapshots, backups will be useful for uploading data from postgres to neon.
|
||||
|
||||
So here is an attempt to design consistent CLI for different usage scenarios:
|
||||
|
||||
@@ -16,8 +16,8 @@ Save`storage_dest` and other parameters in config.
|
||||
Push snapshots to `storage_dest` in background.
|
||||
|
||||
```
|
||||
zenith init --storage_dest=S3_PREFIX
|
||||
zenith start
|
||||
neon init --storage_dest=S3_PREFIX
|
||||
neon start
|
||||
```
|
||||
|
||||
#### 2. Restart pageserver (manually or crash-recovery).
|
||||
@@ -25,7 +25,7 @@ Take `storage_dest` from pageserver config, start pageserver from latest snapsho
|
||||
Push snapshots to `storage_dest` in background.
|
||||
|
||||
```
|
||||
zenith start
|
||||
neon start
|
||||
```
|
||||
|
||||
#### 3. Import.
|
||||
@@ -35,22 +35,22 @@ Do not save `snapshot_path` and `snapshot_format` in config, as it is a one-time
|
||||
Save`storage_dest` parameters in config.
|
||||
Push snapshots to `storage_dest` in background.
|
||||
```
|
||||
//I.e. we want to start zenith on top of existing $PGDATA and use s3 as a persistent storage.
|
||||
zenith init --snapshot_path=FILE_PREFIX --snapshot_format=pgdata --storage_dest=S3_PREFIX
|
||||
zenith start
|
||||
//I.e. we want to start neon on top of existing $PGDATA and use s3 as a persistent storage.
|
||||
neon init --snapshot_path=FILE_PREFIX --snapshot_format=pgdata --storage_dest=S3_PREFIX
|
||||
neon start
|
||||
```
|
||||
How to pass credentials needed for `snapshot_path`?
|
||||
|
||||
#### 4. Export.
|
||||
Manually push snapshot to `snapshot_path` which differs from `storage_dest`
|
||||
Optionally set `snapshot_format`, which can be plain pgdata format or zenith format.
|
||||
Optionally set `snapshot_format`, which can be plain pgdata format or neon format.
|
||||
```
|
||||
zenith export --snapshot_path=FILE_PREFIX --snapshot_format=pgdata
|
||||
neon export --snapshot_path=FILE_PREFIX --snapshot_format=pgdata
|
||||
```
|
||||
|
||||
#### Notes and questions
|
||||
- safekeeper s3_offload should use same (similar) syntax for storage. How to set it in UI?
|
||||
- Why do we need `zenith init` as a separate command? Can't we init everything at first start?
|
||||
- Why do we need `neon init` as a separate command? Can't we init everything at first start?
|
||||
- We can think of better names for all options.
|
||||
- Export to plain postgres format will be useless, if we are not 100% compatible on page level.
|
||||
I can recall at least one such difference - PD_WAL_LOGGED flag in pages.
|
||||
|
||||
@@ -9,7 +9,7 @@ receival and this might lag behind `term`; safekeeper switches to epoch `n` when
|
||||
it has received all committed log records from all `< n` terms. This roughly
|
||||
corresponds to proposed in
|
||||
|
||||
https://github.com/zenithdb/rfcs/pull/3/files
|
||||
https://github.com/neondatabase/rfcs/pull/3/files
|
||||
|
||||
|
||||
This makes our biggest our difference from Raft. In Raft, every log record is
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Safekeeper gossip
|
||||
|
||||
Extracted from this [PR](https://github.com/zenithdb/rfcs/pull/13)
|
||||
Extracted from this [PR](https://github.com/neondatabase/rfcs/pull/13)
|
||||
|
||||
## Motivation
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
Created on 19.01.22
|
||||
|
||||
Initially created [here](https://github.com/zenithdb/rfcs/pull/16) by @kelvich.
|
||||
Initially created [here](https://github.com/neondatabase/rfcs/pull/16) by @kelvich.
|
||||
|
||||
That it is an alternative to (014-safekeeper-gossip)[]
|
||||
|
||||
@@ -292,4 +292,4 @@ But with an etcd we are in a bit different situation:
|
||||
1. We don't need persistency and strong consistency guarantees for the data we store in the etcd
|
||||
2. etcd uses Grpc as a protocol, and messages are pretty simple
|
||||
|
||||
So it looks like implementing in-mem store with etcd interface is straightforward thing _if we will want that in future_. At the same time, we can avoid implementing it right now, and we will be able to run local zenith installation with etcd running somewhere in the background (as opposed to building and running console, which in turn requires Postgres).
|
||||
So it looks like implementing in-mem store with etcd interface is straightforward thing _if we will want that in future_. At the same time, we can avoid implementing it right now, and we will be able to run local neon installation with etcd running somewhere in the background (as opposed to building and running console, which in turn requires Postgres).
|
||||
|
||||
@@ -14,7 +14,6 @@ use byteorder::{BigEndian, ReadBytesExt};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use strum_macros;
|
||||
use utils::{
|
||||
completion,
|
||||
history_buffer::HistoryBufferWithDropCounter,
|
||||
@@ -1077,7 +1076,6 @@ impl PagestreamBeMessage {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::Buf;
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -6,7 +6,6 @@ use crate::{
|
||||
};
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror;
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
|
||||
@@ -656,10 +655,7 @@ fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Ke
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
use bincode;
|
||||
use utils::{id::TenantId, Hex};
|
||||
use utils::Hex;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -623,9 +623,7 @@ fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
|
||||
mod fs_tests {
|
||||
use super::*;
|
||||
|
||||
use bytes::Bytes;
|
||||
use camino_tempfile::tempdir;
|
||||
use futures_util::Stream;
|
||||
use std::{collections::HashMap, io::Write};
|
||||
|
||||
async fn read_and_check_metadata(
|
||||
|
||||
@@ -1040,7 +1040,7 @@ mod tests {
|
||||
Some("test/prefix/"),
|
||||
Some("/test/prefix/"),
|
||||
];
|
||||
let expected_outputs = vec![
|
||||
let expected_outputs = [
|
||||
vec!["", "some/path", "some/path"],
|
||||
vec!["/", "/some/path", "/some/path"],
|
||||
vec![
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
// For details about authentication see docs/authentication.md
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use serde;
|
||||
use std::{borrow::Cow, fmt::Display, fs, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
@@ -206,12 +205,11 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
// "scope": "tenant",
|
||||
// "tenant_id": "3d1f7595b468230304e0b73cecbcb081",
|
||||
// "iss": "neon.controlplane",
|
||||
// "exp": 1709200879,
|
||||
// "iat": 1678442479
|
||||
// }
|
||||
// ```
|
||||
//
|
||||
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJleHAiOjE3MDkyMDA4NzksImlhdCI6MTY3ODQ0MjQ3OX0.U3eA8j-uU-JnhzeO3EDHRuXLwkAUFCPxtGHEgw6p7Ccc3YRbFs2tmCdbD9PZEXP-XsxSeBQi1FY0YPcT3NXADw";
|
||||
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJpYXQiOjE2Nzg0NDI0Nzl9.rNheBnluMJNgXzSTTJoTNIGy4P_qe0JUHl_nVEGuDCTgHOThPVr552EnmKccrCKquPeW3c2YUk0Y9Oh4KyASAw";
|
||||
|
||||
// Check it can be validated with the public key
|
||||
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
|
||||
|
||||
@@ -4,7 +4,9 @@ use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
|
||||
///
|
||||
/// Can be cloned, moved and kept around in futures as "guard objects".
|
||||
#[derive(Clone)]
|
||||
pub struct Completion(TaskTrackerToken);
|
||||
pub struct Completion {
|
||||
_token: TaskTrackerToken,
|
||||
}
|
||||
|
||||
/// Barrier will wait until all clones of [`Completion`] have been dropped.
|
||||
#[derive(Clone)]
|
||||
@@ -49,5 +51,5 @@ pub fn channel() -> (Completion, Barrier) {
|
||||
tracker.close();
|
||||
|
||||
let token = tracker.token();
|
||||
(Completion(token), Barrier(tracker))
|
||||
(Completion { _token: token }, Barrier(tracker))
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ impl Generation {
|
||||
Self::Broken
|
||||
}
|
||||
|
||||
pub fn new(v: u32) -> Self {
|
||||
pub const fn new(v: u32) -> Self {
|
||||
Self::Valid(v)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tracing::{self, debug, info, info_span, warn, Instrument};
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
@@ -156,6 +156,10 @@ pub struct ChannelWriter {
|
||||
buffer: BytesMut,
|
||||
pub tx: mpsc::Sender<std::io::Result<Bytes>>,
|
||||
written: usize,
|
||||
/// Time spent waiting for the channel to make progress. It is not the same as time to upload a
|
||||
/// buffer because we cannot know anything about that, but this should allow us to understand
|
||||
/// the actual time taken without the time spent `std::thread::park`ed.
|
||||
wait_time: std::time::Duration,
|
||||
}
|
||||
|
||||
impl ChannelWriter {
|
||||
@@ -168,6 +172,7 @@ impl ChannelWriter {
|
||||
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
|
||||
tx,
|
||||
written: 0,
|
||||
wait_time: std::time::Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,6 +185,8 @@ impl ChannelWriter {
|
||||
tracing::trace!(n, "flushing");
|
||||
let ready = self.buffer.split().freeze();
|
||||
|
||||
let wait_started_at = std::time::Instant::now();
|
||||
|
||||
// not ideal to call from blocking code to block_on, but we are sure that this
|
||||
// operation does not spawn_blocking other tasks
|
||||
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
|
||||
@@ -192,6 +199,9 @@ impl ChannelWriter {
|
||||
// sending it to the client.
|
||||
Ok(())
|
||||
});
|
||||
|
||||
self.wait_time += wait_started_at.elapsed();
|
||||
|
||||
if res.is_err() {
|
||||
return Err(std::io::ErrorKind::BrokenPipe.into());
|
||||
}
|
||||
@@ -202,6 +212,10 @@ impl ChannelWriter {
|
||||
pub fn flushed_bytes(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
|
||||
pub fn wait_time(&self) -> std::time::Duration {
|
||||
self.wait_time
|
||||
}
|
||||
}
|
||||
|
||||
impl std::io::Write for ChannelWriter {
|
||||
@@ -252,22 +266,52 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
|
||||
|
||||
let span = info_span!("blocking");
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// there are situations where we lose scraped metrics under load, try to gather some clues
|
||||
// since all nodes are queried this, keep the message count low.
|
||||
let spawned_at = std::time::Instant::now();
|
||||
|
||||
let _span = span.entered();
|
||||
|
||||
let metrics = metrics::gather();
|
||||
|
||||
let gathered_at = std::time::Instant::now();
|
||||
|
||||
let res = encoder
|
||||
.encode(&metrics, &mut writer)
|
||||
.and_then(|_| writer.flush().map_err(|e| e.into()));
|
||||
|
||||
// this instant is not when we finally got the full response sent, sending is done by hyper
|
||||
// in another task.
|
||||
let encoded_at = std::time::Instant::now();
|
||||
|
||||
let spawned_in = spawned_at - started_at;
|
||||
let collected_in = gathered_at - spawned_at;
|
||||
// remove the wait time here in case the tcp connection was clogged
|
||||
let encoded_in = encoded_at - gathered_at - writer.wait_time();
|
||||
let total = encoded_at - started_at;
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!(
|
||||
bytes = writer.flushed_bytes(),
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
total_ms = total.as_millis(),
|
||||
spawning_ms = spawned_in.as_millis(),
|
||||
collection_ms = collected_in.as_millis(),
|
||||
encoding_ms = encoded_in.as_millis(),
|
||||
"responded /metrics"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to write out /metrics response: {e:#}");
|
||||
// there is a chance that this error is not the BrokenPipe we generate in the writer
|
||||
// for "closed connection", but it is highly unlikely.
|
||||
tracing::warn!(
|
||||
after_bytes = writer.flushed_bytes(),
|
||||
total_ms = total.as_millis(),
|
||||
spawning_ms = spawned_in.as_millis(),
|
||||
collection_ms = collected_in.as_millis(),
|
||||
encoding_ms = encoded_in.as_millis(),
|
||||
"failed to write out /metrics response: {e:?}"
|
||||
);
|
||||
// semantics of this error are quite... unclear. we want to error the stream out to
|
||||
// abort the response to somehow notify the client that we failed.
|
||||
//
|
||||
|
||||
@@ -415,7 +415,6 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
use serde::ser::Serialize;
|
||||
use serde_assert::{Deserializer, Serializer, Token, Tokens};
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use std::cmp::{Eq, Ordering, PartialOrd};
|
||||
use std::cmp::{Eq, Ordering};
|
||||
use std::collections::BinaryHeap;
|
||||
use std::fmt::Debug;
|
||||
use std::mem;
|
||||
@@ -249,7 +249,6 @@ where
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
impl MonotonicCounter<i32> for i32 {
|
||||
fn cnt_advance(&mut self, val: i32) {
|
||||
|
||||
@@ -221,7 +221,7 @@ impl RcuWaitList {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -239,7 +239,6 @@ mod tests {
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
pin::{pin, Pin},
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ use futures::future::BoxFuture;
|
||||
use futures::{Stream, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::cmp::Ord;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
|
||||
@@ -20,7 +20,6 @@ use std::num::NonZeroUsize;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use toml_edit;
|
||||
use toml_edit::{Document, Item};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -212,9 +211,9 @@ pub struct PageServerConf {
|
||||
|
||||
pub log_format: LogFormat,
|
||||
|
||||
/// Number of tenants which will be concurrently loaded from remote storage proactively on startup,
|
||||
/// does not limit tenants loaded in response to client I/O. A lower value implicitly deprioritizes
|
||||
/// loading such tenants, vs. other work in the system.
|
||||
/// Number of tenants which will be concurrently loaded from remote storage proactively on startup or attach.
|
||||
///
|
||||
/// A lower value implicitly deprioritizes loading such tenants, vs. other work in the system.
|
||||
pub concurrent_tenant_warmup: ConfigurableSemaphore,
|
||||
|
||||
/// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
|
||||
@@ -1203,10 +1202,7 @@ impl ConfigurableSemaphore {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
fs,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
};
|
||||
use std::{fs, num::NonZeroU32};
|
||||
|
||||
use camino_tempfile::{tempdir, Utf8TempDir};
|
||||
use pageserver_api::models::EvictionPolicy;
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
use std::time::SystemTime;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[test]
|
||||
fn startup_collected_timeline_metrics_before_advancing() {
|
||||
|
||||
@@ -20,10 +20,9 @@ use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use thiserror::Error;
|
||||
use tokio;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use tracing::{self, debug, error};
|
||||
use tracing::{debug, error};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
@@ -726,7 +725,7 @@ mod test {
|
||||
use camino::Utf8Path;
|
||||
use hex_literal::hex;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::{io::ErrorKind, time::Duration};
|
||||
use std::io::ErrorKind;
|
||||
use tracing::info;
|
||||
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
@@ -735,10 +734,7 @@ mod test {
|
||||
use crate::{
|
||||
control_plane_client::RetryForeverError,
|
||||
repository::Key,
|
||||
tenant::{
|
||||
harness::TenantHarness, remote_timeline_client::remote_timeline_path,
|
||||
storage_layer::DeltaFileName,
|
||||
},
|
||||
tenant::{harness::TenantHarness, storage_layer::DeltaFileName},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
@@ -1161,13 +1157,8 @@ mod test {
|
||||
pub(crate) mod mock {
|
||||
use tracing::info;
|
||||
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
|
||||
use super::*;
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
pub struct ConsumerState {
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
|
||||
@@ -58,6 +58,7 @@ use utils::{completion, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::disk_usage_based_eviction::METRICS,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
self,
|
||||
@@ -65,7 +66,6 @@ use crate::{
|
||||
remote_timeline_client::LayerFileMetadata,
|
||||
secondary::SecondaryTenant,
|
||||
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName},
|
||||
Timeline,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -409,13 +409,23 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
"running disk usage based eviction due to pressure"
|
||||
);
|
||||
|
||||
let candidates =
|
||||
let (candidates, collection_time) = {
|
||||
let started_at = std::time::Instant::now();
|
||||
match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? {
|
||||
EvictionCandidates::Cancelled => {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
EvictionCandidates::Finished(partitioned) => partitioned,
|
||||
};
|
||||
EvictionCandidates::Finished(partitioned) => (partitioned, started_at.elapsed()),
|
||||
}
|
||||
};
|
||||
|
||||
METRICS.layers_collected.inc_by(candidates.len() as u64);
|
||||
|
||||
tracing::info!(
|
||||
elapsed_ms = collection_time.as_millis(),
|
||||
total_layers = candidates.len(),
|
||||
"collection completed"
|
||||
);
|
||||
|
||||
// Debug-log the list of candidates
|
||||
let now = SystemTime::now();
|
||||
@@ -446,9 +456,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
|
||||
let selection = select_victims(&candidates, usage_pre);
|
||||
let (evicted_amount, usage_planned) =
|
||||
select_victims(&candidates, usage_pre).into_amount_and_planned();
|
||||
|
||||
let (evicted_amount, usage_planned) = selection.into_amount_and_planned();
|
||||
METRICS.layers_selected.inc_by(evicted_amount as u64);
|
||||
|
||||
// phase2: evict layers
|
||||
|
||||
@@ -477,9 +488,15 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
if let Some(next) = next {
|
||||
match next {
|
||||
Ok(Ok(file_size)) => {
|
||||
METRICS.layers_evicted.inc();
|
||||
usage_assumed.add_available_bytes(file_size);
|
||||
}
|
||||
Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => {
|
||||
Ok(Err((
|
||||
file_size,
|
||||
EvictionError::NotFound
|
||||
| EvictionError::Downloaded
|
||||
| EvictionError::Timeout,
|
||||
))) => {
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
@@ -495,7 +512,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
|
||||
// calling again when consumed_all is fine as evicted is fused.
|
||||
let Some((_partition, candidate)) = evicted.next() else {
|
||||
consumed_all = true;
|
||||
if !consumed_all {
|
||||
tracing::info!("all evictions started, waiting");
|
||||
consumed_all = true;
|
||||
}
|
||||
continue;
|
||||
};
|
||||
|
||||
@@ -503,11 +523,15 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
EvictionLayer::Attached(layer) => {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
js.spawn(async move {
|
||||
layer
|
||||
.evict_and_wait()
|
||||
.await
|
||||
.map(|()| file_size)
|
||||
.map_err(|e| (file_size, e))
|
||||
// have a low eviction waiting timeout because our LRU calculations go stale fast;
|
||||
// also individual layer evictions could hang because of bugs and we do not want to
|
||||
// pause disk_usage_based_eviction for such.
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
|
||||
match layer.evict_and_wait(timeout).await {
|
||||
Ok(()) => Ok(file_size),
|
||||
Err(e) => Err((file_size, e)),
|
||||
}
|
||||
});
|
||||
}
|
||||
EvictionLayer::Secondary(layer) => {
|
||||
@@ -529,6 +553,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
(usage_assumed, evictions_failed)
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let evict_layers = async move {
|
||||
let mut evict_layers = std::pin::pin!(evict_layers);
|
||||
|
||||
let maximum_expected = std::time::Duration::from_secs(10);
|
||||
|
||||
let res = tokio::time::timeout(maximum_expected, &mut evict_layers).await;
|
||||
let tuple = if let Ok(tuple) = res {
|
||||
tuple
|
||||
} else {
|
||||
let elapsed = started_at.elapsed();
|
||||
tracing::info!(elapsed_ms = elapsed.as_millis(), "still ongoing");
|
||||
evict_layers.await
|
||||
};
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
tracing::info!(elapsed_ms = elapsed.as_millis(), "completed");
|
||||
tuple
|
||||
};
|
||||
|
||||
let evict_layers =
|
||||
evict_layers.instrument(tracing::info_span!("evict_layers", layers=%evicted_amount));
|
||||
|
||||
let (usage_assumed, evictions_failed) = tokio::select! {
|
||||
tuple = evict_layers => { tuple },
|
||||
_ = cancel.cancelled() => {
|
||||
@@ -763,6 +811,8 @@ async fn collect_eviction_candidates(
|
||||
eviction_order: EvictionOrder,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<EvictionCandidates> {
|
||||
const LOG_DURATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
// get a snapshot of the list of tenants
|
||||
let tenants = tenant::mgr::list_tenants()
|
||||
.await
|
||||
@@ -791,6 +841,8 @@ async fn collect_eviction_candidates(
|
||||
continue;
|
||||
}
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// collect layers from all timelines in this tenant
|
||||
//
|
||||
// If one of the timelines becomes `!is_active()` during the iteration,
|
||||
@@ -805,6 +857,7 @@ async fn collect_eviction_candidates(
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
|
||||
tenant_candidates.extend(info.resident_layers.into_iter());
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
@@ -870,7 +923,25 @@ async fn collect_eviction_candidates(
|
||||
(partition, candidate)
|
||||
});
|
||||
|
||||
METRICS
|
||||
.tenant_layer_count
|
||||
.observe(tenant_candidates.len() as f64);
|
||||
|
||||
candidates.extend(tenant_candidates);
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
METRICS
|
||||
.tenant_collection_time
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if elapsed > LOG_DURATION_THRESHOLD {
|
||||
tracing::info!(
|
||||
tenant_id=%tenant.tenant_shard_id().tenant_id,
|
||||
shard_id=%tenant.tenant_shard_id().shard_slug(),
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"collection took longer than threshold"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: the same tenant ID might be hit twice, if it transitions from attached to
|
||||
@@ -885,11 +956,11 @@ async fn collect_eviction_candidates(
|
||||
},
|
||||
);
|
||||
|
||||
for secondary_tenant in secondary_tenants {
|
||||
for tenant in secondary_tenants {
|
||||
// for secondary tenants we use a sum of on_disk layers and already evicted layers. this is
|
||||
// to prevent repeated disk usage based evictions from completely draining less often
|
||||
// updating secondaries.
|
||||
let (mut layer_info, total_layers) = secondary_tenant.get_layers_for_eviction();
|
||||
let (mut layer_info, total_layers) = tenant.get_layers_for_eviction();
|
||||
|
||||
debug_assert!(
|
||||
total_layers >= layer_info.resident_layers.len(),
|
||||
@@ -897,6 +968,8 @@ async fn collect_eviction_candidates(
|
||||
layer_info.resident_layers.len()
|
||||
);
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
layer_info
|
||||
.resident_layers
|
||||
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
@@ -918,9 +991,27 @@ async fn collect_eviction_candidates(
|
||||
)
|
||||
});
|
||||
|
||||
METRICS
|
||||
.tenant_layer_count
|
||||
.observe(tenant_candidates.len() as f64);
|
||||
candidates.extend(tenant_candidates);
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
|
||||
METRICS
|
||||
.tenant_collection_time
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if elapsed > LOG_DURATION_THRESHOLD {
|
||||
tracing::info!(
|
||||
tenant_id=%tenant.tenant_shard_id().tenant_id,
|
||||
shard_id=%tenant.tenant_shard_id().shard_slug(),
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"collection took longer than threshold"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
@@ -997,30 +1088,6 @@ impl<U: Usage> VictimSelection<U> {
|
||||
}
|
||||
}
|
||||
|
||||
struct TimelineKey(Arc<Timeline>);
|
||||
|
||||
impl PartialEq for TimelineKey {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
Arc::ptr_eq(&self.0, &other.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for TimelineKey {}
|
||||
|
||||
impl std::hash::Hash for TimelineKey {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
Arc::as_ptr(&self.0).hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TimelineKey {
|
||||
type Target = Timeline;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// A totally ordered f32 subset we can use with sorting functions.
|
||||
pub(crate) mod finite_f32 {
|
||||
|
||||
|
||||
@@ -579,6 +579,12 @@ paths:
|
||||
required: false
|
||||
schema:
|
||||
type: integer
|
||||
- name: lazy
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: boolean
|
||||
description: Set to true for attaches to queue up until activated by compute. Eager (false) is the default.
|
||||
put:
|
||||
description: |
|
||||
Configures a _tenant location_, that is how a particular pageserver handles
|
||||
|
||||
@@ -816,13 +816,7 @@ async fn tenant_attach_handler(
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.upsert_location(
|
||||
tenant_shard_id,
|
||||
location_conf,
|
||||
None,
|
||||
SpawnMode::Normal,
|
||||
&ctx,
|
||||
)
|
||||
.upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &ctx)
|
||||
.await?;
|
||||
|
||||
let Some(tenant) = tenant else {
|
||||
@@ -1418,6 +1412,7 @@ async fn put_tenant_location_config_handler(
|
||||
|
||||
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
|
||||
let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
|
||||
let lazy = parse_query_param(&request, "lazy")?.unwrap_or(false);
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
@@ -1448,15 +1443,17 @@ async fn put_tenant_location_config_handler(
|
||||
let location_conf =
|
||||
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
// lazy==true queues up for activation or jumps the queue like normal when a compute connects,
|
||||
// similar to at startup ordering.
|
||||
let spawn_mode = if lazy {
|
||||
tenant::SpawnMode::Lazy
|
||||
} else {
|
||||
tenant::SpawnMode::Eager
|
||||
};
|
||||
|
||||
let attached = state
|
||||
.tenant_manager
|
||||
.upsert_location(
|
||||
tenant_shard_id,
|
||||
location_conf,
|
||||
flush,
|
||||
tenant::SpawnMode::Normal,
|
||||
&ctx,
|
||||
)
|
||||
.upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx)
|
||||
.await?
|
||||
.is_some();
|
||||
|
||||
|
||||
@@ -1915,17 +1915,16 @@ impl Drop for TimelineMetrics {
|
||||
let tenant_id = &self.tenant_id;
|
||||
let timeline_id = &self.timeline_id;
|
||||
let shard_id = &self.shard_id;
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
{
|
||||
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
|
||||
let _ =
|
||||
RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
|
||||
let _ = metric.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
|
||||
self.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -2474,6 +2473,64 @@ pub(crate) mod tenant_throttling {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod disk_usage_based_eviction {
|
||||
use super::*;
|
||||
|
||||
pub(crate) struct Metrics {
|
||||
pub(crate) tenant_collection_time: Histogram,
|
||||
pub(crate) tenant_layer_count: Histogram,
|
||||
pub(crate) layers_collected: IntCounter,
|
||||
pub(crate) layers_selected: IntCounter,
|
||||
pub(crate) layers_evicted: IntCounter,
|
||||
}
|
||||
|
||||
impl Default for Metrics {
|
||||
fn default() -> Self {
|
||||
let tenant_collection_time = register_histogram!(
|
||||
"pageserver_disk_usage_based_eviction_tenant_collection_seconds",
|
||||
"Time spent collecting layers from a tenant -- not normalized by collected layer amount",
|
||||
vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let tenant_layer_count = register_histogram!(
|
||||
"pageserver_disk_usage_based_eviction_tenant_collected_layers",
|
||||
"Amount of layers gathered from a tenant",
|
||||
vec![5.0, 50.0, 500.0, 5000.0, 50000.0]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layers_collected = register_int_counter!(
|
||||
"pageserver_disk_usage_based_eviction_collected_layers_total",
|
||||
"Amount of layers collected"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layers_selected = register_int_counter!(
|
||||
"pageserver_disk_usage_based_eviction_select_layers_total",
|
||||
"Amount of layers selected"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layers_evicted = register_int_counter!(
|
||||
"pageserver_disk_usage_based_eviction_evicted_layers_total",
|
||||
"Amount of layers successfully evicted"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
tenant_collection_time,
|
||||
tenant_layer_count,
|
||||
layers_collected,
|
||||
layers_selected,
|
||||
layers_evicted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
|
||||
}
|
||||
|
||||
pub fn preinitialize_metrics() {
|
||||
// Python tests need these and on some we do alerting.
|
||||
//
|
||||
@@ -2508,6 +2565,7 @@ pub fn preinitialize_metrics() {
|
||||
Lazy::force(&TENANT_MANAGER);
|
||||
|
||||
Lazy::force(&crate::tenant::storage_layer::layer::LAYER_IMPL_METRICS);
|
||||
Lazy::force(&disk_usage_based_eviction::METRICS);
|
||||
|
||||
// countervecs
|
||||
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
|
||||
|
||||
@@ -73,7 +73,6 @@
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
@@ -262,7 +261,9 @@ pub struct PageCache {
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
struct PinnedSlotsPermit {
|
||||
_permit: tokio::sync::OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
@@ -558,9 +559,9 @@ impl PageCache {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
Ok(res) => Ok(PinnedSlotsPermit {
|
||||
_permit: res.expect("this semaphore is never closed"),
|
||||
}),
|
||||
Err(_timeout) => {
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
|
||||
|
||||
@@ -27,7 +27,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use pageserver_api::shard::ShardNumber;
|
||||
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
|
||||
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
@@ -44,7 +44,6 @@ use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::sync::gate::GateGuard;
|
||||
@@ -1115,7 +1114,10 @@ impl PageServerHandler {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = match self.get_cached_timeline_for_page(req) {
|
||||
Ok(tl) => tl,
|
||||
Ok(tl) => {
|
||||
set_tracing_field_shard_id(tl);
|
||||
tl
|
||||
}
|
||||
Err(key) => {
|
||||
match self
|
||||
.load_timeline_for_page(tenant_id, timeline_id, key)
|
||||
@@ -1140,9 +1142,6 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
|
||||
set_tracing_field_shard_id(timeline);
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
|
||||
|
||||
@@ -37,7 +37,6 @@ impl Value {
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
use bytes::Bytes;
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
macro_rules! roundtrip {
|
||||
|
||||
@@ -109,7 +109,6 @@ pub use pageserver_api::models::TenantState;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
|
||||
use toml_edit;
|
||||
use utils::{
|
||||
crashsafe,
|
||||
generation::Generation,
|
||||
@@ -152,7 +151,6 @@ pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
pub mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
@@ -227,7 +225,11 @@ pub(crate) struct TenantPreload {
|
||||
/// When we spawn a tenant, there is a special mode for tenant creation that
|
||||
/// avoids trying to read anything from remote storage.
|
||||
pub(crate) enum SpawnMode {
|
||||
Normal,
|
||||
/// Activate as soon as possible
|
||||
Eager,
|
||||
/// Lazy activation in the background, with the option to skip the queue if the need comes up
|
||||
Lazy,
|
||||
/// Tenant has been created during the lifetime of this process
|
||||
Create,
|
||||
}
|
||||
|
||||
@@ -700,41 +702,37 @@ impl Tenant {
|
||||
.and_then(|x| x.initial_tenant_load_remote.take());
|
||||
|
||||
enum AttachType<'a> {
|
||||
// During pageserver startup, we are attaching this tenant lazily in the background
|
||||
Warmup(tokio::sync::SemaphorePermit<'a>),
|
||||
// During pageserver startup, we are attaching this tenant as soon as we can,
|
||||
// because a client tried to access it.
|
||||
/// We are attaching this tenant lazily in the background.
|
||||
Warmup {
|
||||
_permit: tokio::sync::SemaphorePermit<'a>,
|
||||
during_startup: bool
|
||||
},
|
||||
/// We are attaching this tenant as soon as we can, because for example an
|
||||
/// endpoint tried to access it.
|
||||
OnDemand,
|
||||
// During normal operations after startup, we are attaching a tenant.
|
||||
/// During normal operations after startup, we are attaching a tenant, and
|
||||
/// eager attach was requested.
|
||||
Normal,
|
||||
}
|
||||
|
||||
// Before doing any I/O, wait for either or:
|
||||
// - A client to attempt to access to this tenant (on-demand loading)
|
||||
// - A permit to become available in the warmup semaphore (background warmup)
|
||||
//
|
||||
// Some-ness of init_order is how we know if we're attaching during startup or later
|
||||
// in process lifetime.
|
||||
let attach_type = if init_order.is_some() {
|
||||
let attach_type = if matches!(mode, SpawnMode::Lazy) {
|
||||
// Before doing any I/O, wait for at least one of:
|
||||
// - A client attempting to access to this tenant (on-demand loading)
|
||||
// - A permit becoming available in the warmup semaphore (background warmup)
|
||||
|
||||
tokio::select!(
|
||||
_ = tenant_clone.activate_now_sem.acquire() => {
|
||||
permit = tenant_clone.activate_now_sem.acquire() => {
|
||||
let _ = permit.expect("activate_now_sem is never closed");
|
||||
tracing::info!("Activating tenant (on-demand)");
|
||||
AttachType::OnDemand
|
||||
},
|
||||
permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
|
||||
match permit_result {
|
||||
Ok(p) => {
|
||||
tracing::info!("Activating tenant (warmup)");
|
||||
AttachType::Warmup(p)
|
||||
}
|
||||
Err(_) => {
|
||||
// This is unexpected: the warmup semaphore should stay alive
|
||||
// for the lifetime of init_order. Log a warning and proceed.
|
||||
tracing::warn!("warmup_limit semaphore unexpectedly closed");
|
||||
AttachType::Normal
|
||||
}
|
||||
permit = conf.concurrent_tenant_warmup.inner().acquire() => {
|
||||
let _permit = permit.expect("concurrent_tenant_warmup semaphore is never closed");
|
||||
tracing::info!("Activating tenant (warmup)");
|
||||
AttachType::Warmup {
|
||||
_permit,
|
||||
during_startup: init_order.is_some()
|
||||
}
|
||||
|
||||
}
|
||||
_ = tenant_clone.cancel.cancelled() => {
|
||||
// This is safe, but should be pretty rare: it is interesting if a tenant
|
||||
@@ -749,6 +747,8 @@ impl Tenant {
|
||||
},
|
||||
)
|
||||
} else {
|
||||
// SpawnMode::{Create,Eager} always cause jumping ahead of the
|
||||
// concurrent_tenant_warmup queue
|
||||
AttachType::Normal
|
||||
};
|
||||
|
||||
@@ -756,7 +756,7 @@ impl Tenant {
|
||||
(SpawnMode::Create, _) => {
|
||||
None
|
||||
},
|
||||
(SpawnMode::Normal, Some(remote_storage)) => {
|
||||
(SpawnMode::Eager | SpawnMode::Lazy, Some(remote_storage)) => {
|
||||
let _preload_timer = TENANT.preload.start_timer();
|
||||
let res = tenant_clone
|
||||
.preload(remote_storage, task_mgr::shutdown_token())
|
||||
@@ -769,7 +769,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
(SpawnMode::Normal, None) => {
|
||||
(_, None) => {
|
||||
let _preload_timer = TENANT.preload.start_timer();
|
||||
None
|
||||
}
|
||||
@@ -828,7 +828,7 @@ impl Tenant {
|
||||
let attached = {
|
||||
let _attach_timer = match mode {
|
||||
SpawnMode::Create => None,
|
||||
SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
|
||||
SpawnMode::Eager | SpawnMode::Lazy => Some(TENANT.attach.start_timer()),
|
||||
};
|
||||
tenant_clone.attach(preload, mode, &ctx).await
|
||||
};
|
||||
@@ -850,7 +850,7 @@ impl Tenant {
|
||||
// It also prevents the warmup proccess competing with the concurrency limit on
|
||||
// logical size calculations: if logical size calculation semaphore is saturated,
|
||||
// then warmup will wait for that before proceeding to the next tenant.
|
||||
if let AttachType::Warmup(_permit) = attach_type {
|
||||
if matches!(attach_type, AttachType::Warmup { during_startup: true, .. }) {
|
||||
let mut futs: FuturesUnordered<_> = tenant_clone.timelines.lock().unwrap().values().cloned().map(|t| t.await_initial_logical_size()).collect();
|
||||
tracing::info!("Waiting for initial logical sizes while warming up...");
|
||||
while futs.next().await.is_some() {}
|
||||
@@ -923,7 +923,7 @@ impl Tenant {
|
||||
deleting: false,
|
||||
timelines: HashMap::new(),
|
||||
},
|
||||
(None, SpawnMode::Normal) => {
|
||||
(None, _) => {
|
||||
anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
|
||||
}
|
||||
};
|
||||
@@ -2382,7 +2382,7 @@ impl Tenant {
|
||||
self.tenant_shard_id,
|
||||
self.generation,
|
||||
self.shard_identity,
|
||||
self.walredo_mgr.as_ref().map(Arc::clone),
|
||||
self.walredo_mgr.clone(),
|
||||
resources,
|
||||
pg_version,
|
||||
state,
|
||||
@@ -3591,25 +3591,18 @@ pub async fn dump_layerfile_from_path(
|
||||
#[cfg(test)]
|
||||
pub(crate) mod harness {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::Utf8PathBuf;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use utils::logging;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
use crate::walredo::apply_neon;
|
||||
use crate::{
|
||||
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
|
||||
};
|
||||
use crate::{repository::Key, walrecord::NeonWalRecord};
|
||||
|
||||
use super::*;
|
||||
use crate::tenant::config::{TenantConf, TenantConfOpt};
|
||||
use hex_literal::hex;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::id::TenantId;
|
||||
|
||||
pub const TIMELINE_ID: TimelineId =
|
||||
TimelineId::from_array(hex!("11223344556677881122334455667788"));
|
||||
@@ -3769,7 +3762,7 @@ pub(crate) mod harness {
|
||||
let preload = tenant
|
||||
.preload(&self.remote_storage, CancellationToken::new())
|
||||
.await?;
|
||||
tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?;
|
||||
tenant.attach(Some(preload), SpawnMode::Eager, ctx).await?;
|
||||
|
||||
tenant.state.send_replace(TenantState::Active);
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
@@ -3838,10 +3831,8 @@ mod tests {
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use bytes::BytesMut;
|
||||
use hex_literal::hex;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
|
||||
|
||||
@@ -52,7 +52,10 @@ pub mod defaults {
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
|
||||
// The default limit on WAL lag should be set to avoid causing disconnects under high throughput
|
||||
// scenarios: since the broker stats are updated ~1/s, a value of 1GiB should be sufficient for
|
||||
// throughputs up to 1GiB/s per timeline.
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
@@ -420,7 +420,7 @@ impl DeleteTenantFlow {
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
tenant
|
||||
.attach(preload, super::SpawnMode::Normal, ctx)
|
||||
.attach(preload, super::SpawnMode::Eager, ctx)
|
||||
.await
|
||||
.context("attach")?;
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
use byteorder::{ReadBytesExt, BE};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use either::Either;
|
||||
use hex;
|
||||
use std::{cmp::Ordering, io, result};
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
@@ -700,8 +699,6 @@ impl<const L: usize> BuildNode<L> {
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
|
||||
use rand::Rng;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
@@ -300,7 +300,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
|
||||
use crate::tenant::block_io::BlockReaderRef;
|
||||
use rand::{thread_rng, RngCore};
|
||||
use std::fs;
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -595,7 +595,7 @@ pub async fn init_tenant_mgr(
|
||||
shard_identity,
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
SpawnMode::Normal,
|
||||
SpawnMode::Lazy,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
@@ -1106,9 +1106,9 @@ impl TenantManager {
|
||||
|
||||
// Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
|
||||
// the caller thinks they're creating but the tenant already existed. We must switch to
|
||||
// Normal mode so that when starting this Tenant we properly probe remote storage for timelines,
|
||||
// Eager mode so that when starting this Tenant we properly probe remote storage for timelines,
|
||||
// rather than assuming it to be empty.
|
||||
spawn_mode = SpawnMode::Normal;
|
||||
spawn_mode = SpawnMode::Eager;
|
||||
}
|
||||
Some(TenantSlot::Secondary(state)) => {
|
||||
info!("Shutting down secondary tenant");
|
||||
@@ -1300,7 +1300,7 @@ impl TenantManager {
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
)?;
|
||||
|
||||
@@ -1521,7 +1521,7 @@ impl TenantManager {
|
||||
*child_shard,
|
||||
child_location_conf,
|
||||
None,
|
||||
SpawnMode::Normal,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2064,7 +2064,7 @@ pub(crate) async fn load_tenant(
|
||||
shard_identity,
|
||||
None,
|
||||
&TENANTS,
|
||||
SpawnMode::Normal,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
)
|
||||
.with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
|
||||
@@ -2648,7 +2648,7 @@ pub(crate) async fn immediate_gc(
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_shard_id)
|
||||
.map(Arc::clone)
|
||||
.cloned()
|
||||
.with_context(|| format!("tenant {tenant_shard_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
use std::{
|
||||
io,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
|
||||
fn fsync_path(path: &Utf8Path) -> io::Result<()> {
|
||||
// TODO use VirtualFile::fsync_all once we fully go async.
|
||||
let file = std::fs::File::open(path)?;
|
||||
file.sync_all()
|
||||
}
|
||||
|
||||
fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
|
||||
while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
|
||||
fsync_path(path)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
||||
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
|
||||
|
||||
/// Use at most this number of threads.
|
||||
/// Increasing this limit will
|
||||
/// - use more memory
|
||||
/// - increase the cost of spawn/join latency
|
||||
const MAX_NUM_THREADS: usize = 64;
|
||||
let num_threads = paths.len().min(MAX_NUM_THREADS);
|
||||
let next_path_idx = AtomicUsize::new(0);
|
||||
|
||||
std::thread::scope(|s| -> io::Result<()> {
|
||||
let mut handles = vec![];
|
||||
// Spawn `num_threads - 1`, as the current thread is also a worker.
|
||||
for _ in 1..num_threads {
|
||||
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
|
||||
}
|
||||
|
||||
parallel_worker(paths, &next_path_idx)?;
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
|
||||
pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
||||
if paths.len() == 1 {
|
||||
fsync_path(&paths[0])?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
fsync_in_thread_pool(paths)
|
||||
}
|
||||
|
||||
/// Parallel fsync asynchronously.
|
||||
pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
||||
const MAX_CONCURRENT_FSYNC: usize = 64;
|
||||
let mut next = paths.iter().peekable();
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
loop {
|
||||
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
|
||||
let next = next.next().expect("just peeked");
|
||||
let next = next.to_owned();
|
||||
js.spawn_blocking(move || fsync_path(&next));
|
||||
}
|
||||
|
||||
// now the joinset has been filled up, wait for next to complete
|
||||
if let Some(res) = js.join_next().await {
|
||||
res??;
|
||||
} else {
|
||||
// last item had already completed
|
||||
assert!(
|
||||
next.peek().is_none(),
|
||||
"joinset emptied, we shouldn't have more work"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1791,14 +1791,12 @@ mod tests {
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
storage_layer::Layer,
|
||||
Generation, Tenant, Timeline,
|
||||
Tenant, Timeline,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
|
||||
use std::collections::HashSet;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
|
||||
format!("contents for {name}").into()
|
||||
|
||||
@@ -161,7 +161,7 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
|
||||
|
||||
pub fn is_temp_download_file(path: &Utf8Path) -> bool {
|
||||
pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
|
||||
let extension = path.extension();
|
||||
match extension {
|
||||
Some(TEMP_DOWNLOAD_EXTENSION) => true,
|
||||
|
||||
@@ -32,7 +32,7 @@ use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
|
||||
use utils::{completion::Barrier, id::TimelineId, sync::gate::Gate};
|
||||
|
||||
enum DownloadCommand {
|
||||
Download(TenantShardId),
|
||||
@@ -121,6 +121,10 @@ impl SecondaryTenant {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
|
||||
self.tenant_shard_id
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancel.cancel();
|
||||
|
||||
@@ -164,16 +168,17 @@ impl SecondaryTenant {
|
||||
self.detail.lock().unwrap().get_layers_for_eviction(self)
|
||||
}
|
||||
|
||||
/// Cancellation safe, but on cancellation the eviction will go through
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
|
||||
pub(crate) async fn evict_layer(
|
||||
&self,
|
||||
self: &Arc<Self>,
|
||||
conf: &PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
name: LayerFileName,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let _guard = match self.gate.enter() {
|
||||
let guard = match self.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
|
||||
@@ -187,35 +192,57 @@ impl SecondaryTenant {
|
||||
.timeline_path(&self.tenant_shard_id, &timeline_id)
|
||||
.join(name.file_name());
|
||||
|
||||
// We tolerate ENOENT, because between planning eviction and executing
|
||||
// it, the secondary downloader could have seen an updated heatmap that
|
||||
// resulted in a layer being deleted.
|
||||
// Other local I/O errors are process-fatal: these should never happen.
|
||||
tokio::fs::remove_file(path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("Deleting layer during eviction");
|
||||
let this = self.clone();
|
||||
|
||||
// Update the timeline's state. This does not have to be synchronized with
|
||||
// the download process, because:
|
||||
// - If downloader is racing with us to remove a file (e.g. because it is
|
||||
// removed from heatmap), then our mutual .remove() operations will both
|
||||
// succeed.
|
||||
// - If downloader is racing with us to download the object (this would require
|
||||
// multiple eviction iterations to race with multiple download iterations), then
|
||||
// if we remove it from the state, the worst that happens is the downloader
|
||||
// downloads it again before re-inserting, or we delete the file but it remains
|
||||
// in the state map (in which case it will be downloaded if this secondary
|
||||
// tenant transitions to attached and tries to access it)
|
||||
//
|
||||
// The important assumption here is that the secondary timeline state does not
|
||||
// have to 100% match what is on disk, because it's a best-effort warming
|
||||
// of the cache.
|
||||
let mut detail = self.detail.lock().unwrap();
|
||||
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
|
||||
timeline_detail.on_disk_layers.remove(&name);
|
||||
timeline_detail.evicted_at.insert(name, now);
|
||||
}
|
||||
// spawn it to be cancellation safe
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _guard = guard;
|
||||
// We tolerate ENOENT, because between planning eviction and executing
|
||||
// it, the secondary downloader could have seen an updated heatmap that
|
||||
// resulted in a layer being deleted.
|
||||
// Other local I/O errors are process-fatal: these should never happen.
|
||||
let deleted = std::fs::remove_file(path);
|
||||
|
||||
let not_found = deleted
|
||||
.as_ref()
|
||||
.is_err_and(|x| x.kind() == std::io::ErrorKind::NotFound);
|
||||
|
||||
let deleted = if not_found {
|
||||
false
|
||||
} else {
|
||||
deleted
|
||||
.map(|()| true)
|
||||
.fatal_err("Deleting layer during eviction")
|
||||
};
|
||||
|
||||
if !deleted {
|
||||
// skip updating accounting and putting perhaps later timestamp
|
||||
return;
|
||||
}
|
||||
|
||||
// Update the timeline's state. This does not have to be synchronized with
|
||||
// the download process, because:
|
||||
// - If downloader is racing with us to remove a file (e.g. because it is
|
||||
// removed from heatmap), then our mutual .remove() operations will both
|
||||
// succeed.
|
||||
// - If downloader is racing with us to download the object (this would require
|
||||
// multiple eviction iterations to race with multiple download iterations), then
|
||||
// if we remove it from the state, the worst that happens is the downloader
|
||||
// downloads it again before re-inserting, or we delete the file but it remains
|
||||
// in the state map (in which case it will be downloaded if this secondary
|
||||
// tenant transitions to attached and tries to access it)
|
||||
//
|
||||
// The important assumption here is that the secondary timeline state does not
|
||||
// have to 100% match what is on disk, because it's a best-effort warming
|
||||
// of the cache.
|
||||
let mut detail = this.detail.lock().unwrap();
|
||||
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
|
||||
timeline_detail.on_disk_layers.remove(&name);
|
||||
timeline_detail.evicted_at.insert(name, now);
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("secondary eviction should not have panicked");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,7 +16,8 @@ use crate::{
|
||||
config::SecondaryLocationConfig,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
remote_timeline_client::{
|
||||
index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
|
||||
index::LayerFileMetadata, is_temp_download_file, FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
},
|
||||
span::debug_assert_current_span_has_tenant_id,
|
||||
storage_layer::LayerFileName,
|
||||
@@ -788,7 +789,7 @@ async fn init_timeline_state(
|
||||
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
|
||||
warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
|
||||
continue;
|
||||
} else if crate::is_temporary(&file_path) {
|
||||
} else if crate::is_temporary(&file_path) || is_temp_download_file(&file_path) {
|
||||
// Temporary files are frequently left behind from restarting during downloads
|
||||
tracing::info!("Cleaning up temporary file {file_path}");
|
||||
if let Err(e) = tokio::fs::remove_file(&file_path)
|
||||
|
||||
@@ -18,7 +18,6 @@ use crate::{
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
use md5;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::Rng;
|
||||
use remote_storage::{GenericRemoteStorage, TimeoutOrCancel};
|
||||
|
||||
@@ -72,7 +72,7 @@ where
|
||||
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
|
||||
/// call, to collect more records.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ValueReconstructState {
|
||||
pub records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
|
||||
@@ -43,7 +43,6 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
@@ -8,7 +8,7 @@ use pageserver_api::shard::ShardIndex;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tracing::Instrument;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::heavier_once_cell;
|
||||
@@ -208,10 +208,15 @@ impl Layer {
|
||||
/// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
|
||||
/// re-downloaded, [`EvictionError::Downloaded`] is returned.
|
||||
///
|
||||
/// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
|
||||
/// will happen regardless the future returned by this method completing unless there is a
|
||||
/// read access (currently including [`Layer::keep_resident`]) before eviction gets to
|
||||
/// complete.
|
||||
///
|
||||
/// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
|
||||
/// of download-evict cycle on retry.
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait().await
|
||||
pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait(timeout).await
|
||||
}
|
||||
|
||||
/// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
|
||||
@@ -363,7 +368,7 @@ impl Layer {
|
||||
///
|
||||
/// Does not start local deletion, use [`Self::delete_on_drop`] for that
|
||||
/// separatedly.
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(feature = "testing", test))]
|
||||
pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||
let mut rx = self.0.status.subscribe();
|
||||
|
||||
@@ -632,7 +637,7 @@ impl LayerInner {
|
||||
|
||||
/// Cancellation safe, however dropping the future and calling this method again might result
|
||||
/// in a new attempt to evict OR join the previously started attempt.
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
assert!(self.have_remote_client);
|
||||
@@ -652,16 +657,22 @@ impl LayerInner {
|
||||
if strong.is_some() {
|
||||
// drop the DownloadedLayer outside of the holding the guard
|
||||
drop(strong);
|
||||
|
||||
// idea here is that only one evicter should ever get to witness a strong reference,
|
||||
// which means whenever get_or_maybe_download upgrades a weak, it must mark up a
|
||||
// cancelled eviction and signal us, like it currently does.
|
||||
//
|
||||
// a second concurrent evict_and_wait will not see a strong reference.
|
||||
LAYER_IMPL_METRICS.inc_started_evictions();
|
||||
}
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(Status::Evicted) => Ok(()),
|
||||
Ok(Status::Downloaded) => Err(EvictionError::Downloaded),
|
||||
Err(RecvError::Closed) => {
|
||||
match tokio::time::timeout(timeout, rx.recv()).await {
|
||||
Ok(Ok(Status::Evicted)) => Ok(()),
|
||||
Ok(Ok(Status::Downloaded)) => Err(EvictionError::Downloaded),
|
||||
Ok(Err(RecvError::Closed)) => {
|
||||
unreachable!("sender cannot be dropped while we are in &self method")
|
||||
}
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
Ok(Err(RecvError::Lagged(_))) => {
|
||||
// this is quite unlikely, but we are blocking a lot in the async context, so
|
||||
// we might be missing this because we are stuck on a LIFO slot on a thread
|
||||
// which is busy blocking for a 1TB database create_image_layers.
|
||||
@@ -674,6 +685,7 @@ impl LayerInner {
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
Err(_timeout) => Err(EvictionError::Timeout),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1195,6 +1207,9 @@ pub(crate) enum EvictionError {
|
||||
/// Evictions must always lose to downloads in races, and this time it happened.
|
||||
#[error("layer was downloaded instead")]
|
||||
Downloaded,
|
||||
|
||||
#[error("eviction did not happen within timeout")]
|
||||
Timeout,
|
||||
}
|
||||
|
||||
/// Error internal to the [`LayerInner::get_or_maybe_download`]
|
||||
|
||||
@@ -1,13 +1,173 @@
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::Instrument;
|
||||
use utils::{
|
||||
completion::{self, Completion},
|
||||
id::TimelineId,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::task_mgr::BACKGROUND_RUNTIME;
|
||||
use crate::tenant::harness::TenantHarness;
|
||||
use crate::{context::DownloadBehavior, task_mgr::BACKGROUND_RUNTIME};
|
||||
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
|
||||
|
||||
/// Used in tests to advance a future to wanted await point, and not futher.
|
||||
const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
|
||||
|
||||
/// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
|
||||
/// timeout uses to advance futures.
|
||||
const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
|
||||
|
||||
/// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
|
||||
#[tokio::test]
|
||||
async fn smoke_test() {
|
||||
let handle = BACKGROUND_RUNTIME.handle();
|
||||
|
||||
let h = TenantHarness::create("smoke_test").unwrap();
|
||||
let span = h.span();
|
||||
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
|
||||
let (tenant, _) = h.load().await;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
let layers = timeline.layers.read().await;
|
||||
layers.resident_layers().collect::<Vec<_>>().await
|
||||
};
|
||||
|
||||
assert_eq!(layers.len(), 1);
|
||||
|
||||
layers.swap_remove(0)
|
||||
};
|
||||
|
||||
// all layers created at pageserver are like `layer`, initialized with strong
|
||||
// Arc<DownloadedLayer>.
|
||||
|
||||
let img_before = {
|
||||
let mut data = ValueReconstructState::default();
|
||||
layer
|
||||
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
data.img
|
||||
.take()
|
||||
.expect("tenant harness writes the control file")
|
||||
};
|
||||
|
||||
// important part is evicting the layer, which can be done when there are no more ResidentLayer
|
||||
// instances -- there currently are none, only two `Layer` values, one in the layermap and on
|
||||
// in scope.
|
||||
layer.evict_and_wait(FOREVER).await.unwrap();
|
||||
|
||||
// double-evict returns an error, which is valid if both eviction_task and disk usage based
|
||||
// eviction would both evict the same layer at the same time.
|
||||
|
||||
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
|
||||
assert!(matches!(e, EvictionError::NotFound));
|
||||
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValueReconstructState::default();
|
||||
layer
|
||||
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
data.img.take().unwrap()
|
||||
};
|
||||
|
||||
assert_eq!(img_before, img_after);
|
||||
|
||||
// evict_and_wait can timeout, but it doesn't cancel the evicting itself
|
||||
//
|
||||
// ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
|
||||
// artificially slow it down.
|
||||
let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
|
||||
|
||||
match layer
|
||||
.evict_and_wait(std::time::Duration::ZERO)
|
||||
.await
|
||||
.unwrap_err()
|
||||
{
|
||||
EvictionError::Timeout => {
|
||||
// expected, but note that the eviction is "still ongoing"
|
||||
helper.release().await;
|
||||
// exhaust spawn_blocking pool to ensure it is now complete
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle)
|
||||
.await;
|
||||
}
|
||||
other => unreachable!("{other:?}"),
|
||||
}
|
||||
|
||||
// only way to query if a layer is resident is to acquire a ResidentLayer instance.
|
||||
// Layer::keep_resident never downloads, but it might initialize if the layer file is found
|
||||
// downloaded locally.
|
||||
let none = layer.keep_resident().await.unwrap();
|
||||
assert!(
|
||||
none.is_none(),
|
||||
"Expected none, because eviction removed the local file, found: {none:?}"
|
||||
);
|
||||
|
||||
// plain downloading is rarely needed
|
||||
layer
|
||||
.download_and_keep_resident()
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// last important part is deletion on drop: gc and compaction use it for compacted L0 layers
|
||||
// or fully garbage collected layers. deletion means deleting the local file, and scheduling a
|
||||
// deletion of the already unlinked from index_part.json remote file.
|
||||
//
|
||||
// marking a layer to be deleted on drop is irreversible; there is no technical reason against
|
||||
// reversiblity, but currently it is not needed so it is not provided.
|
||||
layer.delete_on_drop();
|
||||
|
||||
let path = layer.local_path().to_owned();
|
||||
|
||||
// wait_drop produces an unconnected to Layer future which will resolve when the
|
||||
// LayerInner::drop has completed.
|
||||
let mut wait_drop = std::pin::pin!(layer.wait_drop());
|
||||
|
||||
// paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
|
||||
// until here
|
||||
tokio::time::pause();
|
||||
tokio::time::timeout(ADVANCE, &mut wait_drop)
|
||||
.await
|
||||
.expect_err("should had timed out because two strong references exist");
|
||||
|
||||
tokio::fs::metadata(&path)
|
||||
.await
|
||||
.expect("the local layer file still exists");
|
||||
|
||||
let rtc = timeline.remote_client.as_ref().unwrap();
|
||||
|
||||
{
|
||||
let layers = &[layer];
|
||||
let mut g = timeline.layers.write().await;
|
||||
g.finish_gc_timeline(layers);
|
||||
// this just updates the remote_physical_size for demonstration purposes
|
||||
rtc.schedule_gc_update(layers).unwrap();
|
||||
}
|
||||
|
||||
// when strong references are dropped, the file is deleted and remote deletion is scheduled
|
||||
wait_drop.await;
|
||||
|
||||
let e = tokio::fs::metadata(&path)
|
||||
.await
|
||||
.expect_err("the local file is deleted");
|
||||
assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
|
||||
|
||||
rtc.wait_completion().await.unwrap();
|
||||
|
||||
assert_eq!(rtc.get_remote_physical_size(), 0);
|
||||
}
|
||||
|
||||
/// This test demonstrates a previous hang when a eviction and deletion were requested at the same
|
||||
/// time. Now both of them complete per Arc drop semantics.
|
||||
@@ -41,10 +201,10 @@ async fn evict_and_wait_on_wanted_deleted() {
|
||||
let resident = layer.keep_resident().await.unwrap();
|
||||
|
||||
{
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
|
||||
|
||||
// drive the future to await on the status channel
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
|
||||
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
|
||||
@@ -115,10 +275,10 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
|
||||
let resident = layer.keep_resident().await.unwrap();
|
||||
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
|
||||
let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
|
||||
|
||||
// drive the future to await on the status channel
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
|
||||
tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
@@ -138,7 +298,7 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
|
||||
// because the keep_resident check alters wanted evicted without sending a message, we will
|
||||
// never get completed
|
||||
let e = tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
|
||||
let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
|
||||
.await
|
||||
.expect("no timeout, because keep_resident re-initialized")
|
||||
.expect_err("eviction should not have succeeded because re-initialized");
|
||||
@@ -158,9 +318,10 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
.sum::<u64>()
|
||||
);
|
||||
|
||||
let mut second_eviction = std::pin::pin!(layer.evict_and_wait());
|
||||
let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
|
||||
// advance to the wait on the queue
|
||||
tokio::time::timeout(ADVANCE, &mut second_eviction)
|
||||
.await
|
||||
.expect_err("timeout because spawn_blocking is clogged");
|
||||
|
||||
@@ -171,7 +332,12 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
|
||||
helper.release().await;
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
|
||||
// the second_eviction gets to run here
|
||||
//
|
||||
// synchronize to be *strictly* after the second_eviction spawn_blocking run
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
|
||||
|
||||
tokio::time::timeout(ADVANCE, &mut second_eviction)
|
||||
.await
|
||||
.expect("eviction goes through now that spawn_blocking is unclogged")
|
||||
.expect("eviction should succeed, because version matches");
|
||||
@@ -261,3 +427,49 @@ impl SpawnBlockingPoolHelper {
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spawn_blocking_pool_helper_actually_works() {
|
||||
// create a custom runtime for which we know and control how many blocking threads it has
|
||||
//
|
||||
// because the amount is not configurable for our helper, expect the same amount as
|
||||
// BACKGROUND_RUNTIME using the tokio defaults would have.
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.max_blocking_threads(512)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let handle = rt.handle();
|
||||
|
||||
rt.block_on(async move {
|
||||
// this will not return until all threads are spun up and actually executing the code
|
||||
// waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
|
||||
let consumed = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
|
||||
|
||||
println!("consumed");
|
||||
|
||||
let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
|
||||
// this will not get to run before we release
|
||||
}));
|
||||
|
||||
println!("spawned");
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
|
||||
.await
|
||||
.expect_err("the task should not have gotten to run yet");
|
||||
|
||||
println!("tried to join");
|
||||
|
||||
consumed.release().await;
|
||||
|
||||
println!("released");
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(1), jh)
|
||||
.await
|
||||
.expect("no timeout")
|
||||
.expect("no join error");
|
||||
|
||||
println!("joined");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ mod walreceiver;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use camino::Utf8Path;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::stream::StreamExt;
|
||||
@@ -50,12 +50,10 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
|
||||
use crate::pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind};
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{
|
||||
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
@@ -75,6 +73,10 @@ use crate::{
|
||||
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
@@ -1512,10 +1514,14 @@ impl Timeline {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
match local_layer.evict_and_wait().await {
|
||||
// curl has this by default
|
||||
let timeout = std::time::Duration::from_secs(120);
|
||||
|
||||
match local_layer.evict_and_wait(timeout).await {
|
||||
Ok(()) => Ok(Some(true)),
|
||||
Err(EvictionError::NotFound) => Ok(Some(false)),
|
||||
Err(EvictionError::Downloaded) => Ok(Some(false)),
|
||||
Err(EvictionError::Timeout) => Ok(Some(false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3410,40 +3416,31 @@ impl Timeline {
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
let ctx = ctx.attached_child();
|
||||
move || {
|
||||
// Write it out
|
||||
// Keep this inside `spawn_blocking` and `Handle::current`
|
||||
// as long as the write path is still sync and the read impl
|
||||
// is still not fully async. Otherwise executor threads would
|
||||
// be blocked.
|
||||
let _g = span.entered();
|
||||
let new_delta =
|
||||
Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
|
||||
let new_delta_path = new_delta.local_path().to_owned();
|
||||
|
||||
// Sync it to disk.
|
||||
//
|
||||
// We must also fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable.
|
||||
//
|
||||
// NB: timeline dir must be synced _after_ the file contents are durable.
|
||||
// So, two separate fsyncs are required, they mustn't be batched.
|
||||
//
|
||||
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
|
||||
// files to flush, the fsync overhead can be reduces as follows:
|
||||
// 1. write them all to temporary file names
|
||||
// 2. fsync them
|
||||
// 3. rename to the final name
|
||||
// 4. fsync the parent directory.
|
||||
// Note that (1),(2),(3) today happen inside write_to_disk().
|
||||
//
|
||||
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
|
||||
par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
|
||||
par_fsync::par_fsync(&[self_clone
|
||||
.conf
|
||||
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
|
||||
.context("fsync of timeline dir")?;
|
||||
|
||||
anyhow::Ok(new_delta)
|
||||
Handle::current().block_on(
|
||||
async move {
|
||||
let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?;
|
||||
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
//
|
||||
// We use fatal_err() below because the after write_to_disk returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
let timeline_dir =
|
||||
VirtualFile::open(&self_clone.conf.timeline_path(
|
||||
&self_clone.tenant_shard_id,
|
||||
&self_clone.timeline_id,
|
||||
))
|
||||
.await
|
||||
.fatal_err("VirtualFile::open for timeline dir fsync");
|
||||
timeline_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.fatal_err("VirtualFile::sync_all timeline dir");
|
||||
anyhow::Ok(new_delta)
|
||||
}
|
||||
.instrument(span),
|
||||
)
|
||||
}
|
||||
})
|
||||
.await
|
||||
@@ -3670,30 +3667,24 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Sync the new layer to disk before adding it to the layer map, to make sure
|
||||
// we don't garbage collect something based on the new layer, before it has
|
||||
// reached the disk.
|
||||
//
|
||||
// We must also fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
//
|
||||
// Compaction creates multiple image layers. It would be better to create them all
|
||||
// and fsync them all in parallel.
|
||||
let all_paths = image_layers
|
||||
.iter()
|
||||
.map(|layer| layer.local_path().to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
par_fsync::par_fsync_async(&all_paths)
|
||||
// The writer.finish() above already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
if !image_layers.is_empty() {
|
||||
// We use fatal_err() below because the after writer.finish() returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
let timeline_dir = VirtualFile::open(
|
||||
&self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
|
||||
)
|
||||
.await
|
||||
.context("fsync of newly created layer files")?;
|
||||
|
||||
if !all_paths.is_empty() {
|
||||
par_fsync::par_fsync_async(&[self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id)])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
.fatal_err("VirtualFile::open for timeline dir fsync");
|
||||
timeline_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.fatal_err("VirtualFile::sync_all timeline dir");
|
||||
}
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
@@ -4275,25 +4266,24 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
|
||||
let layer_paths: Vec<Utf8PathBuf> = new_layers
|
||||
.iter()
|
||||
.map(|l| l.local_path().to_owned())
|
||||
.collect();
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync_async(&layer_paths)
|
||||
// The writer.finish() above already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
//
|
||||
// We use fatal_err() below because the after writer.finish() returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
let timeline_dir = VirtualFile::open(
|
||||
&self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
|
||||
)
|
||||
.await
|
||||
.fatal_err("VirtualFile::open for timeline dir fsync");
|
||||
timeline_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.context("fsync all new layers")?;
|
||||
|
||||
let timeline_dir = self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id);
|
||||
|
||||
par_fsync::par_fsync_async(&[timeline_dir])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
.fatal_err("VirtualFile::sync_all timeline dir");
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
|
||||
@@ -5157,8 +5147,7 @@ mod tests {
|
||||
let harness =
|
||||
TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
|
||||
|
||||
let ctx = any_context();
|
||||
let tenant = harness.do_try_load(&ctx).await.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
@@ -5172,8 +5161,10 @@ mod tests {
|
||||
.expect("should had been resident")
|
||||
.drop_eviction_guard();
|
||||
|
||||
let first = async { layer.evict_and_wait().await };
|
||||
let second = async { layer.evict_and_wait().await };
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
|
||||
let first = layer.evict_and_wait(forever);
|
||||
let second = layer.evict_and_wait(forever);
|
||||
|
||||
let (first, second) = tokio::join!(first, second);
|
||||
|
||||
@@ -5192,12 +5183,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn any_context() -> crate::context::RequestContext {
|
||||
use crate::context::*;
|
||||
use crate::task_mgr::*;
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
|
||||
}
|
||||
|
||||
async fn find_some_layer(timeline: &Timeline) -> Layer {
|
||||
let layers = timeline.layers.read().await;
|
||||
let desc = layers
|
||||
|
||||
@@ -75,14 +75,13 @@ impl Timeline {
|
||||
|
||||
let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
|
||||
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
|
||||
let ctx_adaptor = RequestContextAdaptor(ctx.clone());
|
||||
|
||||
pageserver_compaction::compact_tiered::compact_tiered(
|
||||
&mut adaptor,
|
||||
end_lsn,
|
||||
target_file_size,
|
||||
fanout,
|
||||
&ctx_adaptor,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -143,13 +142,13 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
type DeltaLayer = ResidentDeltaLayer;
|
||||
type ImageLayer = ResidentImageLayer;
|
||||
|
||||
type RequestContext = RequestContextAdaptor;
|
||||
type RequestContext = crate::context::RequestContext;
|
||||
|
||||
async fn get_layers(
|
||||
&mut self,
|
||||
key_range: &Range<Key>,
|
||||
lsn_range: &Range<Lsn>,
|
||||
_ctx: &RequestContextAdaptor,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
|
||||
self.flush_updates().await?;
|
||||
|
||||
@@ -170,7 +169,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
&mut self,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
_ctx: &RequestContextAdaptor,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<Range<Key>>> {
|
||||
if lsn == self.keyspace.0 {
|
||||
Ok(pageserver_compaction::helpers::intersect_keyspace(
|
||||
@@ -206,7 +205,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
key_range: &Range<Key>,
|
||||
ctx: &RequestContextAdaptor,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
Ok(self.create_image_impl(lsn, key_range, ctx).await?)
|
||||
}
|
||||
@@ -216,7 +215,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
lsn_range: &Range<Lsn>,
|
||||
key_range: &Range<Key>,
|
||||
input_layers: &[ResidentDeltaLayer],
|
||||
ctx: &RequestContextAdaptor,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
|
||||
|
||||
@@ -287,7 +286,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
async fn delete_layer(
|
||||
&mut self,
|
||||
layer: &OwnArc<PersistentLayerDesc>,
|
||||
_ctx: &RequestContextAdaptor,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.layers_to_delete.push(layer.clone().0);
|
||||
Ok(())
|
||||
@@ -299,7 +298,7 @@ impl TimelineAdaptor {
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
key_range: &Range<Key>,
|
||||
ctx: &RequestContextAdaptor,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
@@ -361,17 +360,7 @@ impl TimelineAdaptor {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RequestContextAdaptor(pub RequestContext);
|
||||
|
||||
impl std::ops::Deref for RequestContextAdaptor {
|
||||
type Target = RequestContext;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactionRequestContext for RequestContextAdaptor {}
|
||||
impl CompactionRequestContext for crate::context::RequestContext {}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OwnArc<T>(pub Arc<T>);
|
||||
@@ -449,10 +438,7 @@ impl CompactionLayer<Key> for ResidentDeltaLayer {
|
||||
impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
|
||||
type DeltaEntry<'a> = DeltaEntry<'a>;
|
||||
|
||||
async fn load_keys<'a>(
|
||||
&self,
|
||||
ctx: &RequestContextAdaptor,
|
||||
) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
|
||||
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
|
||||
self.0.load_keys(ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,6 +204,7 @@ impl Timeline {
|
||||
evicted: usize,
|
||||
errors: usize,
|
||||
not_evictable: usize,
|
||||
timeouts: usize,
|
||||
#[allow(dead_code)]
|
||||
skipped_for_shutdown: usize,
|
||||
}
|
||||
@@ -267,7 +268,11 @@ impl Timeline {
|
||||
let layer = guard.drop_eviction_guard();
|
||||
if no_activity_for > p.threshold {
|
||||
// this could cause a lot of allocations in some cases
|
||||
js.spawn(async move { layer.evict_and_wait().await });
|
||||
js.spawn(async move {
|
||||
layer
|
||||
.evict_and_wait(std::time::Duration::from_secs(5))
|
||||
.await
|
||||
});
|
||||
stats.candidates += 1;
|
||||
}
|
||||
}
|
||||
@@ -280,6 +285,9 @@ impl Timeline {
|
||||
Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
|
||||
stats.not_evictable += 1;
|
||||
}
|
||||
Ok(Err(EvictionError::Timeout)) => {
|
||||
stats.timeouts += 1;
|
||||
}
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
/* already logged */
|
||||
@@ -295,7 +303,8 @@ impl Timeline {
|
||||
stats = join_all => {
|
||||
if stats.candidates == stats.not_evictable {
|
||||
debug!(stats=?stats, "eviction iteration complete");
|
||||
} else if stats.errors > 0 || stats.not_evictable > 0 {
|
||||
} else if stats.errors > 0 || stats.not_evictable > 0 || stats.timeouts > 0 {
|
||||
// reminder: timeouts are not eviction cancellations
|
||||
warn!(stats=?stats, "eviction iteration complete");
|
||||
} else {
|
||||
info!(stats=?stats, "eviction iteration complete");
|
||||
|
||||
@@ -1667,8 +1667,6 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::harness::*;
|
||||
use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
|
||||
use crate::tenant::Timeline;
|
||||
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
|
||||
use postgres_ffi::RELSEG_SIZE;
|
||||
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
|
||||
@@ -262,7 +262,7 @@ impl PostgresRedoManager {
|
||||
// next request will launch a new one.
|
||||
if let Err(e) = result.as_ref() {
|
||||
error!(
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
|
||||
"error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
|
||||
records.len(),
|
||||
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
|
||||
@@ -252,8 +252,6 @@ mod test {
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
|
||||
|
||||
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
|
||||
#[test]
|
||||
fn apply_aux_file_deltas() -> anyhow::Result<()> {
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use tracing;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::metrics::WalRedoKillCause;
|
||||
use crate::metrics::WAL_REDO_PROCESS_COUNTERS;
|
||||
|
||||
@@ -21,7 +21,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "lib/hyperloglog.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include RELFILEINFO_HDR
|
||||
@@ -60,6 +62,7 @@
|
||||
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
|
||||
#define MB ((uint64)1024*1024)
|
||||
|
||||
#define HYPER_LOG_LOG_BIT_WIDTH 10
|
||||
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
@@ -84,6 +87,8 @@ typedef struct FileCacheControl
|
||||
uint64 writes;
|
||||
dlist_head lru; /* double linked list for LRU replacement
|
||||
* algorithm */
|
||||
hyperLogLogState wss_estimation; /* estimation of wroking set size */
|
||||
uint8_t hyperloglog_hashes[(1 << HYPER_LOG_LOG_BIT_WIDTH) + 1];
|
||||
} FileCacheControl;
|
||||
|
||||
static HTAB *lfc_hash;
|
||||
@@ -232,6 +237,14 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->writes = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
/* Initialize hyper-log-log structure for estimating working set size */
|
||||
initHyperLogLog(&lfc_ctl->wss_estimation, HYPER_LOG_LOG_BIT_WIDTH);
|
||||
|
||||
/* We need hashes in shared memory */
|
||||
pfree(lfc_ctl->wss_estimation.hashesArr);
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
lfc_ctl->wss_estimation.hashesArr = lfc_ctl->hyperloglog_hashes;
|
||||
|
||||
/* Recreate file cache on restart */
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
@@ -529,6 +542,11 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
|
||||
/* Approximate working set */
|
||||
tag.blockNum = blkno;
|
||||
addHyperLogLog(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
|
||||
|
||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||
{
|
||||
/* Page is not cached */
|
||||
@@ -967,3 +985,21 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
else
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(approximate_working_set_size);
|
||||
|
||||
Datum
|
||||
approximate_working_set_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 dc = -1;
|
||||
if (lfc_size_limit != 0)
|
||||
{
|
||||
bool reset = PG_GETARG_BOOL(0);
|
||||
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
|
||||
dc = (int32) estimateHyperLogLog(&lfc_ctl->wss_estimation);
|
||||
if (reset)
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
PG_RETURN_INT32(dc);
|
||||
}
|
||||
|
||||
9
pgxn/neon/neon--1.2--1.3.sql
Normal file
9
pgxn/neon/neon--1.2--1.3.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.3'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION approximate_working_set_size(reset bool)
|
||||
RETURNS integer
|
||||
AS 'MODULE_PATHNAME', 'approximate_working_set_size'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
GRANT EXECUTE ON FUNCTION approximate_working_set_size(bool) TO pg_monitor;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon extension
|
||||
comment = 'cloud storage for PostgreSQL'
|
||||
default_version = '1.2'
|
||||
default_version = '1.3'
|
||||
module_pathname = '$libdir/neon'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -13,7 +13,7 @@ use proxy::proxy::run_until_cancelled;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use clap::{self, Arg};
|
||||
use clap::Arg;
|
||||
use futures::TryFutureExt;
|
||||
use proxy::console::messages::MetricsAuxInfo;
|
||||
use proxy::stream::{PqStream, Stream};
|
||||
|
||||
3
proxy/src/cache/project_info.rs
vendored
3
proxy/src/cache/project_info.rs
vendored
@@ -358,8 +358,7 @@ impl Cache for ProjectInfoCacheImpl {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{console::AuthSecret, scram::ServerSecret};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use crate::scram::ServerSecret;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_project_info_cache_settings() {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
|
||||
use crate::auth::IpPattern;
|
||||
@@ -98,7 +98,16 @@ pub struct MetricsAuxInfo {
|
||||
pub endpoint_id: EndpointId,
|
||||
pub project_id: ProjectId,
|
||||
pub branch_id: BranchId,
|
||||
pub is_cold_start: Option<bool>,
|
||||
pub cold_start_info: Option<ColdStartInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ColdStartInfo {
|
||||
Unknown = 0,
|
||||
Warm = 1,
|
||||
PoolHit = 2,
|
||||
PoolMiss = 3,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -111,6 +120,7 @@ mod tests {
|
||||
"endpoint_id": "endpoint",
|
||||
"project_id": "project",
|
||||
"branch_id": "branch",
|
||||
"cold_start_info": "unknown",
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
};
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_backend::{self, AuthType, PostgresBackend, PostgresBackendTCP, QueryError};
|
||||
use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError};
|
||||
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
|
||||
use std::{convert::Infallible, future};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
|
||||
@@ -9,7 +9,7 @@ use tracing::{field::display, info_span, Span};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
console::messages::MetricsAuxInfo,
|
||||
console::messages::{ColdStartInfo, MetricsAuxInfo},
|
||||
error::ErrorKind,
|
||||
metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
|
||||
BranchId, DbName, EndpointId, ProjectId, RoleName,
|
||||
@@ -42,7 +42,7 @@ pub struct RequestMonitoring {
|
||||
error_kind: Option<ErrorKind>,
|
||||
pub(crate) auth_method: Option<AuthMethod>,
|
||||
success: bool,
|
||||
is_cold_start: Option<bool>,
|
||||
cold_start_info: Option<ColdStartInfo>,
|
||||
|
||||
// extra
|
||||
// This sender is here to keep the request monitoring channel open while requests are taking place.
|
||||
@@ -91,7 +91,7 @@ impl RequestMonitoring {
|
||||
error_kind: None,
|
||||
auth_method: None,
|
||||
success: false,
|
||||
is_cold_start: None,
|
||||
cold_start_info: None,
|
||||
|
||||
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
|
||||
latency_timer: LatencyTimer::new(protocol),
|
||||
@@ -115,7 +115,7 @@ impl RequestMonitoring {
|
||||
self.set_endpoint_id(x.endpoint_id);
|
||||
self.branch = Some(x.branch_id);
|
||||
self.project = Some(x.project_id);
|
||||
self.is_cold_start = x.is_cold_start;
|
||||
self.cold_start_info = x.cold_start_info;
|
||||
}
|
||||
|
||||
pub fn set_project_id(&mut self, project_id: ProjectId) {
|
||||
|
||||
@@ -93,7 +93,7 @@ struct RequestData {
|
||||
/// Or if we make it to proxy_pass
|
||||
success: bool,
|
||||
/// Indicates if the cplane started the new compute node for this request.
|
||||
is_cold_start: Option<bool>,
|
||||
cold_start_info: Option<String>,
|
||||
/// Tracks time from session start (HTTP request/libpq TCP handshake)
|
||||
/// Through to success/failure
|
||||
duration_us: u64,
|
||||
@@ -121,7 +121,10 @@ impl From<RequestMonitoring> for RequestData {
|
||||
region: value.region,
|
||||
error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
|
||||
success: value.success,
|
||||
is_cold_start: value.is_cold_start,
|
||||
cold_start_info: value
|
||||
.cold_start_info
|
||||
.as_ref()
|
||||
.map(|x| serde_json::to_string(x).unwrap_or_default()),
|
||||
duration_us: SystemTime::from(value.first_packet)
|
||||
.elapsed()
|
||||
.unwrap_or_default()
|
||||
@@ -455,7 +458,7 @@ mod tests {
|
||||
region: "us-east-1",
|
||||
error: None,
|
||||
success: rng.gen(),
|
||||
is_cold_start: Some(true),
|
||||
cold_start_info: Some("no".into()),
|
||||
duration_us: rng.gen_range(0..30_000_000),
|
||||
}
|
||||
}
|
||||
@@ -525,16 +528,16 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315032, 3, 6000),
|
||||
(1315025, 3, 6000),
|
||||
(1315085, 3, 6000),
|
||||
(1315042, 3, 6000),
|
||||
(1315172, 3, 6000),
|
||||
(1315014, 3, 6000),
|
||||
(1314806, 3, 6000),
|
||||
(1315042, 3, 6000),
|
||||
(438563, 1, 2000)
|
||||
],
|
||||
(1314406, 3, 6000),
|
||||
(1314399, 3, 6000),
|
||||
(1314459, 3, 6000),
|
||||
(1314416, 3, 6000),
|
||||
(1314546, 3, 6000),
|
||||
(1314388, 3, 6000),
|
||||
(1314180, 3, 6000),
|
||||
(1314416, 3, 6000),
|
||||
(438359, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
@@ -563,12 +566,12 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1220433, 5, 10000),
|
||||
(1226583, 5, 10000),
|
||||
(1228377, 5, 10000),
|
||||
(1227739, 5, 10000),
|
||||
(1219017, 5, 10000)
|
||||
],
|
||||
(1220668, 5, 10000),
|
||||
(1226818, 5, 10000),
|
||||
(1228612, 5, 10000),
|
||||
(1227974, 5, 10000),
|
||||
(1219252, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
@@ -599,12 +602,12 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1206080, 5, 10000),
|
||||
(1205811, 5, 10000),
|
||||
(1206104, 5, 10000),
|
||||
(1206092, 5, 10000),
|
||||
(1206347, 5, 10000)
|
||||
],
|
||||
(1206315, 5, 10000),
|
||||
(1206046, 5, 10000),
|
||||
(1206339, 5, 10000),
|
||||
(1206327, 5, 10000),
|
||||
(1206582, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
@@ -628,16 +631,16 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315032, 3, 6000),
|
||||
(1315025, 3, 6000),
|
||||
(1315085, 3, 6000),
|
||||
(1315042, 3, 6000),
|
||||
(1315172, 3, 6000),
|
||||
(1315014, 3, 6000),
|
||||
(1314806, 3, 6000),
|
||||
(1315042, 3, 6000),
|
||||
(438563, 1, 2000)
|
||||
],
|
||||
(1314406, 3, 6000),
|
||||
(1314399, 3, 6000),
|
||||
(1314459, 3, 6000),
|
||||
(1314416, 3, 6000),
|
||||
(1314546, 3, 6000),
|
||||
(1314388, 3, 6000),
|
||||
(1314180, 3, 6000),
|
||||
(1314416, 3, 6000),
|
||||
(438359, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
@@ -673,7 +676,7 @@ mod tests {
|
||||
// files are smaller than the size threshold, but they took too long to fill so were flushed early
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[(659129, 2, 3001), (658842, 2, 3000), (658638, 2, 2999)],
|
||||
[(658837, 2, 3001), (658551, 2, 3000), (658347, 2, 2999)]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBacken
|
||||
use crate::console::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::ErrorKind;
|
||||
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
use crate::{http, sasl, scram};
|
||||
use anyhow::{bail, Context};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
|
||||
@@ -11,7 +11,6 @@ use bytes::{Bytes, BytesMut};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use postgres_protocol::message::frontend;
|
||||
use tokio::io::{AsyncReadExt, DuplexStream};
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::TlsConnect;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
|
||||
@@ -667,7 +667,6 @@ impl<C: ClientInnerExt> Drop for Client<C> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use env_logger;
|
||||
use std::{mem, sync::atomic::AtomicBool};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -19,8 +19,6 @@ use utils::{bin_ser::LeSer, id::TenantTimelineId};
|
||||
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
use std::convert::TryInto;
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 7;
|
||||
|
||||
@@ -219,12 +217,9 @@ impl Storage for FileStorage {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::FileStorage;
|
||||
use super::*;
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::Result;
|
||||
use tokio::fs;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
fn stub_conf() -> SafeKeeperConf {
|
||||
let workdir = camino_tempfile::tempdir().unwrap().into_path();
|
||||
|
||||
@@ -2,8 +2,7 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use std::str::FromStr;
|
||||
use std::str::{self};
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
@@ -16,8 +15,8 @@ use crate::safekeeper::Term;
|
||||
use crate::timeline::TimelineError;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_backend::QueryError;
|
||||
use postgres_backend::{self, PostgresBackend};
|
||||
use postgres_ffi::PG_TLI;
|
||||
use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
|
||||
use regex::Regex;
|
||||
|
||||
@@ -2180,6 +2180,11 @@ class NeonAttachmentService(MetricsGetter):
|
||||
self.stop(immediate=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogCursor:
|
||||
_line_no: int
|
||||
|
||||
|
||||
class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
An object representing a running pageserver.
|
||||
@@ -2343,7 +2348,18 @@ class NeonPageserver(PgProtocol):
|
||||
value = self.http_client().get_metric_value(metric)
|
||||
assert value == 0, f"Nonzero {metric} == {value}"
|
||||
|
||||
def log_contains(self, pattern: str) -> Optional[str]:
|
||||
def assert_log_contains(
|
||||
self, pattern: str, offset: None | LogCursor = None
|
||||
) -> Tuple[str, LogCursor]:
|
||||
"""Convenient for use inside wait_until()"""
|
||||
|
||||
res = self.log_contains(pattern, offset=offset)
|
||||
assert res is not None
|
||||
return res
|
||||
|
||||
def log_contains(
|
||||
self, pattern: str, offset: None | LogCursor = None
|
||||
) -> Optional[Tuple[str, LogCursor]]:
|
||||
"""Check that the pageserver log contains a line that matches the given regex"""
|
||||
logfile = self.workdir / "pageserver.log"
|
||||
if not logfile.exists():
|
||||
@@ -2357,12 +2373,17 @@ class NeonPageserver(PgProtocol):
|
||||
# no guarantee it is already present in the log file. This hasn't
|
||||
# been a problem in practice, our python tests are not fast enough
|
||||
# to hit that race condition.
|
||||
skip_until_line_no = 0 if offset is None else offset._line_no
|
||||
cur_line_no = 0
|
||||
with logfile.open("r") as f:
|
||||
for line in f:
|
||||
if cur_line_no < skip_until_line_no:
|
||||
cur_line_no += 1
|
||||
continue
|
||||
if contains_re.search(line):
|
||||
# found it!
|
||||
return line
|
||||
|
||||
cur_line_no += 1
|
||||
return (line, LogCursor(cur_line_no))
|
||||
return None
|
||||
|
||||
def tenant_attach(
|
||||
|
||||
@@ -286,7 +286,11 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_location_conf(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], location_conf=dict[str, Any], flush_ms=None
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
location_conf=dict[str, Any],
|
||||
flush_ms=None,
|
||||
lazy: Optional[bool] = None,
|
||||
):
|
||||
body = location_conf.copy()
|
||||
body["tenant_id"] = str(tenant_id)
|
||||
@@ -295,6 +299,9 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
if flush_ms is not None:
|
||||
params["flush_ms"] = str(flush_ms)
|
||||
|
||||
if lazy is not None:
|
||||
params["lazy"] = "true" if lazy else "false"
|
||||
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config",
|
||||
json=body,
|
||||
|
||||
@@ -20,7 +20,7 @@ def assert_tenant_state(
|
||||
tenant: TenantId,
|
||||
expected_state: str,
|
||||
message: Optional[str] = None,
|
||||
):
|
||||
) -> None:
|
||||
tenant_status = pageserver_http.tenant_status(tenant)
|
||||
log.info(f"tenant_status: {tenant_status}")
|
||||
assert tenant_status["state"]["slug"] == expected_state, message or tenant_status
|
||||
@@ -206,8 +206,8 @@ def wait_for_last_record_lsn(
|
||||
return current_lsn
|
||||
if i % 10 == 0:
|
||||
log.info(
|
||||
"waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
|
||||
lsn, current_lsn, i + 1
|
||||
"{}/{} waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
|
||||
tenant, timeline, lsn, current_lsn, i + 1
|
||||
)
|
||||
)
|
||||
time.sleep(0.1)
|
||||
@@ -292,7 +292,7 @@ def timeline_delete_wait_completed(
|
||||
iterations: int = 20,
|
||||
interval: Optional[float] = None,
|
||||
**delete_args,
|
||||
):
|
||||
) -> None:
|
||||
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
|
||||
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval)
|
||||
|
||||
@@ -302,7 +302,7 @@ def assert_prefix_empty(
|
||||
remote_storage: Optional[RemoteStorage],
|
||||
prefix: Optional[str] = None,
|
||||
allowed_postfix: Optional[str] = None,
|
||||
):
|
||||
) -> None:
|
||||
assert remote_storage is not None
|
||||
response = list_prefix(remote_storage, prefix)
|
||||
keys = response["KeyCount"]
|
||||
|
||||
@@ -252,6 +252,16 @@ class S3Storage:
|
||||
|
||||
log.info(f"deleted {cnt} objects from remote storage")
|
||||
|
||||
def tenant_path(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.prefix_in_bucket}/tenants/{tenant_id}"
|
||||
|
||||
def heatmap_key(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"
|
||||
|
||||
def heatmap_content(self, tenant_id: TenantId):
|
||||
r = self.client.get_object(Bucket=self.bucket_name, Key=self.heatmap_key(tenant_id))
|
||||
return json.loads(r["Body"].read().decode("utf-8"))
|
||||
|
||||
|
||||
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
||||
|
||||
|
||||
@@ -369,7 +369,12 @@ def start_in_background(
|
||||
return spawned_process
|
||||
|
||||
|
||||
def wait_until(number_of_iterations: int, interval: float, func: Fn):
|
||||
WaitUntilRet = TypeVar("WaitUntilRet")
|
||||
|
||||
|
||||
def wait_until(
|
||||
number_of_iterations: int, interval: float, func: Callable[[], WaitUntilRet]
|
||||
) -> WaitUntilRet:
|
||||
"""
|
||||
Wait until 'func' returns successfully, without exception. Returns the
|
||||
last return value from the function.
|
||||
@@ -387,6 +392,18 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn):
|
||||
raise Exception("timed out while waiting for %s" % func) from last_exception
|
||||
|
||||
|
||||
def assert_eq(a, b) -> None:
|
||||
assert a == b
|
||||
|
||||
|
||||
def assert_gt(a, b) -> None:
|
||||
assert a > b
|
||||
|
||||
|
||||
def assert_ge(a, b) -> None:
|
||||
assert a >= b
|
||||
|
||||
|
||||
def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
|
||||
"""
|
||||
Fast way to populate data.
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user